http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index dabae2c,0000000..7c64b37 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@@ -1,1014 -1,0 +1,1016 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSPacket; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; + +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; + +/** + * ErasureCodingWorker handles the erasure coding recovery work commands. These + * commands would be issued from Namenode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling EC + * commands. + */ +public final class ErasureCodingWorker { + private static final Log LOG = DataNode.LOG; + + private final DataNode datanode; + private final Configuration conf; + + private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL; + private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; + private final int STRIPED_READ_TIMEOUT_MILLIS; + private final int STRIPED_READ_BUFFER_SIZE; + + public ErasureCodingWorker(Configuration conf, DataNode datanode) { + this.datanode = datanode; + this.conf = conf; + + STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + initializeStripedReadThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); + STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + initializeStripedBlkRecoveryThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT)); + } + + private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { + return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits); + } + + private void initializeStripedReadThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped reads; pool threads=" + num); + } + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + private void initializeStripedBlkRecoveryThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped block recovery; pool threads=" + num); + } + STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, + TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIdx = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement()); + return t; + } + }); + STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + /** + * Handles the Erasure Coding recovery work commands. + * + * @param ecTasks + * BlockECRecoveryInfo + */ + public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) { + for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + try { + STRIPED_BLK_RECOVERY_THREAD_POOL + .submit(new ReconstructAndTransferBlock(recoveryInfo)); + } catch (Throwable e) { + LOG.warn("Failed to recover striped block " + + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + } + } + } + + /** + * ReconstructAndTransferBlock recover one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to recover striped block group, in each round, we + * recover <code>bufferSize</code> data until finish, the + * <code>bufferSize</code> is configurable and may be less or larger than + * cell size: + * step1: read <code>bufferSize</code> data from minimum number of sources + * required by recovery. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read <code>bufferSize</code> data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and recover all missed striped block + * if they are more than one. + * + * In step3, send the recovered data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the recovery work + * are one of the source datanodes, so the recovered data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC recovery? Since EC recovery + * is more expensive than continuous block replication, it needs to + * read from several other datanodes, should we make sure the + * recovered result received by targets? + */ + private class ReconstructAndTransferBlock implements Runnable { + private final int dataBlkNum; + private final int parityBlkNum; + private final int cellSize; + + private RawErasureDecoder decoder; + + // Striped read buffer size + private int bufferSize; + + private final ExtendedBlock blockGroup; + private final int minRequiredSources; + // position in striped internal block + private long positionInBlock; + + // sources + private final short[] liveIndices; + private final DatanodeInfo[] sources; + + private final List<StripedReader> stripedReaders; + + // The buffers and indices for striped blocks whose length is 0 + private ByteBuffer[] zeroStripeBuffers; + private short[] zeroStripeIndices; + + // targets + private final DatanodeInfo[] targets; + private final StorageType[] targetStorageTypes; + + private final short[] targetIndices; + private final ByteBuffer[] targetBuffers; + + private final Socket[] targetSockets; + private final DataOutputStream[] targetOutputStreams; + private final DataInputStream[] targetInputStreams; + + private final long[] blockOffset4Targets; + private final long[] seqNo4Targets; + + private final static int WRITE_PACKET_SIZE = 64 * 1024; + private DataChecksum checksum; + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + private final CachingStrategy cachingStrategy; + + private final Map<Future<Void>, Integer> futures = new HashMap<>(); + private final CompletionService<Void> readService = + new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); + + ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { + ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy(); + dataBlkNum = ecPolicy.getNumDataUnits(); + parityBlkNum = ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + + blockGroup = recoveryInfo.getExtendedBlock(); + final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); + minRequiredSources = Math.min(cellsNum, dataBlkNum); + + liveIndices = recoveryInfo.getLiveBlockIndices(); + sources = recoveryInfo.getSourceDnInfos(); + stripedReaders = new ArrayList<>(sources.length); + + Preconditions.checkArgument(liveIndices.length >= minRequiredSources, + "No enough live striped blocks."); + Preconditions.checkArgument(liveIndices.length == sources.length, + "liveBlockIndices and source dns should match"); + + if (minRequiredSources < dataBlkNum) { + zeroStripeBuffers = + new ByteBuffer[dataBlkNum - minRequiredSources]; + zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; + } + + targets = recoveryInfo.getTargetDnInfos(); + targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targetIndices = new short[targets.length]; + targetBuffers = new ByteBuffer[targets.length]; + + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + + targetSockets = new Socket[targets.length]; + targetOutputStreams = new DataOutputStream[targets.length]; + targetInputStreams = new DataInputStream[targets.length]; + + blockOffset4Targets = new long[targets.length]; + seqNo4Targets = new long[targets.length]; + + for (int i = 0; i < targets.length; i++) { + blockOffset4Targets[i] = 0; + seqNo4Targets[i] = 0; + } + + getTargetIndices(); + cachingStrategy = CachingStrategy.newDefaultStrategy(); + } + + private ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + + private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, + dataBlkNum, i); + } + + private long getBlockLen(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, i); + } + + /** + * StripedReader is used to read from one source DN, it contains a block + * reader, buffer and striped block index. + * Only allocate StripedReader once for one source, and the StripedReader + * has the same array order with sources. Typically we only need to allocate + * minimum number (minRequiredSources) of StripedReader, and allocate + * new for new source DN if some existing DN invalid or slow. + * If some source DN is corrupt, set the corresponding blockReader to + * null and will never read from it again. + * + * @param i the array index of sources + * @param offsetInBlock offset for the internal block + * @return StripedReader + */ + private StripedReader addStripedReader(int i, long offsetInBlock) { + StripedReader reader = new StripedReader(liveIndices[i]); + stripedReaders.add(reader); + + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]); + if (blockReader != null) { + initChecksumAndBufferSizeIfNeeded(blockReader); + reader.blockReader = blockReader; + } + reader.buffer = allocateBuffer(bufferSize); + return reader; + } + + @Override + public void run() { + datanode.incrementXmitsInProgress(); + try { + // Store the array indices of source DNs we have read successfully. + // In each iteration of read, the success list may be updated if + // some source DN is corrupted or slow. And use the updated success + // list of DNs for next iteration read. + int[] success = new int[minRequiredSources]; + + int nsuccess = 0; + for (int i = 0; + i < sources.length && nsuccess < minRequiredSources; i++) { + StripedReader reader = addStripedReader(i, 0); + if (reader.blockReader != null) { + success[nsuccess++] = i; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't find minimum sources required by " + + "recovery, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i] = allocateBuffer(bufferSize); + } + } + + for (int i = 0; i < targets.length; i++) { + targetBuffers[i] = allocateBuffer(bufferSize); + } + + checksumSize = checksum.getChecksumSize(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; + + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. + boolean[] targetsStatus = new boolean[targets.length]; + if (initTargetStreams(targetsStatus) == 0) { + String error = "All targets are failed."; + throw new IOException(error); + } + + long firstStripedBlockLength = getBlockLen(blockGroup, 0); + while (positionInBlock < firstStripedBlockLength) { + int toRead = Math.min( + bufferSize, (int)(firstStripedBlockLength - positionInBlock)); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + success = readMinimumStripedData4Recovery(success); + + // step2: decode to reconstruct targets + long remaining = firstStripedBlockLength - positionInBlock; + int toRecoverLen = remaining < bufferSize ? + (int)remaining : bufferSize; + recoverTargets(success, targetsStatus, toRecoverLen); + + // step3: transfer data + if (transferData2Targets(targetsStatus) == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + clearBuffers(); + positionInBlock += toRead; + } + + endTargetBlocks(targetsStatus); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to recover striped block: " + blockGroup, e); + } finally { + datanode.decrementXmitsInProgress(); + // close block readers + for (StripedReader stripedReader : stripedReaders) { + closeBlockReader(stripedReader.blockReader); + } + for (int i = 0; i < targets.length; i++) { + IOUtils.closeStream(targetOutputStreams[i]); + IOUtils.closeStream(targetInputStreams[i]); + IOUtils.closeStream(targetSockets[i]); + } + } + } + + // init checksum from block reader + private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { + if (checksum == null) { + checksum = blockReader.getDataChecksum(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + // The bufferSize is flat to divide bytesPerChecksum + int readBufferSize = STRIPED_READ_BUFFER_SIZE; + bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : + readBufferSize - readBufferSize % bytesPerChecksum; + } else { + assert blockReader.getDataChecksum().equals(checksum); + } + } + + private void getTargetIndices() { + BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); + for (int i = 0; i < sources.length; i++) { + bitset.set(liveIndices[i]); + } + int m = 0; + int k = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (getBlockLen(blockGroup, i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + } + } else { + zeroStripeIndices[k++] = (short)i; + } + } + } + } + + private long getReadLength(int index) { + long blockLen = getBlockLen(blockGroup, index); + long remaining = blockLen - positionInBlock; + return remaining > bufferSize ? bufferSize : remaining; + } + + /** + * Read from minimum source DNs required for reconstruction in the iteration. + * First try the success list which we think they are the best DNs + * If source DN is corrupt or slow, try to read some other source DN, + * and will update the success list. + * + * Remember the updated success list and return it for following + * operations and next iteration read. + * + * @param success the initial success list of source DNs we think best + * @return updated success list of source DNs we do real read + * @throws IOException + */ + private int[] readMinimumStripedData4Recovery(final int[] success) + throws IOException { + int nsuccess = 0; + int[] newSuccess = new int[minRequiredSources]; + BitSet used = new BitSet(sources.length); + /* + * Read from minimum source DNs required, the success list contains + * source DNs which we think best. + */ + for (int i = 0; i < minRequiredSources; i++) { + StripedReader reader = stripedReaders.get(success[i]); + if (getReadLength(liveIndices[success[i]]) > 0) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, success[i]); + } else { + // If the read length is 0, we don't need to do real read + reader.buffer.position(0); + newSuccess[nsuccess++] = success[i]; + } + used.set(success[i]); + } + + while (!futures.isEmpty()) { + try { + StripingChunkReadResult result = + StripedBlockUtil.getNextCompletedStripedRead( + readService, futures, STRIPED_READ_TIMEOUT_MILLIS); + int resultIndex = -1; + if (result.state == StripingChunkReadResult.SUCCESSFUL) { + resultIndex = result.index; + } else if (result.state == StripingChunkReadResult.FAILED) { + // If read failed for some source DN, we should not use it anymore + // and schedule read from another source DN. + StripedReader failedReader = stripedReaders.get(result.index); + closeBlockReader(failedReader.blockReader); + failedReader.blockReader = null; + resultIndex = scheduleNewRead(used); + } else if (result.state == StripingChunkReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + resultIndex = scheduleNewRead(used); + } + if (resultIndex >= 0) { + newSuccess[nsuccess++] = resultIndex; + if (nsuccess >= minRequiredSources) { + // cancel remaining reads if we read successfully from minimum + // number of source DNs required by reconstruction. + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } + } catch (InterruptedException e) { + LOG.info("Read data interrupted.", e); + break; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't read data from minimum number of sources " + + "required by reconstruction, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + return newSuccess; + } + + private void paddingBufferToLen(ByteBuffer buffer, int len) { + int toPadding = len - buffer.position(); + for (int i = 0; i < toPadding; i++) { + buffer.put((byte) 0); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = newDecoder(dataBlkNum, parityBlkNum); + } + } + + private int[] getErasedIndices(boolean[] targetsStatus) { + int[] result = new int[targets.length]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + result[m++] = convertIndex4Decode(targetIndices[i], + dataBlkNum, parityBlkNum); + } + } + return Arrays.copyOf(result, m); + } + + private void recoverTargets(int[] success, boolean[] targetsStatus, + int toRecoverLen) { + initDecoderIfNecessary(); + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < success.length; i++) { + StripedReader reader = stripedReaders.get(success[i]); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = + (ByteBuffer)buffer.flip(); + } + if (success.length < dataBlkNum) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + ByteBuffer buffer = zeroStripeBuffers[i]; + paddingBufferToLen(buffer, toRecoverLen); + int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum, + parityBlkNum); + inputs[index] = (ByteBuffer)buffer.flip(); + } + } + int[] erasedIndices = getErasedIndices(targetsStatus); + ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length]; + int m = 0; + for (int i = 0; i < targetBuffers.length; i++) { + if (targetsStatus[i]) { + outputs[m++] = targetBuffers[i]; + outputs[i].limit(toRecoverLen); + } + } + decoder.decode(inputs, erasedIndices, outputs); + + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = getBlockLen(blockGroup, targetIndices[i]); + long remaining = blockLen - positionInBlock; + if (remaining < 0) { + targetBuffers[i].limit(0); + } else if (remaining < toRecoverLen) { + targetBuffers[i].limit((int)remaining); + } + } + } + } + + /** + * Schedule a read from some new source DN if some DN is corrupted + * or slow, this is called from the read iteration. + * Initially we may only have <code>minRequiredSources</code> number of + * StripedReader. + * If the position is at the end of target block, don't need to do + * real read, and return the array index of source DN, otherwise -1. + * + * @param used the used source DNs in this iteration. + * @return the array index of source DN if don't need to do real read. + */ + private int scheduleNewRead(BitSet used) { + StripedReader reader = null; + // step1: initially we may only have <code>minRequiredSources</code> + // number of StripedReader, and there may be some source DNs we never + // read before, so will try to create StripedReader for one new source DN + // and try to read from it. If found, go to step 3. + int m = stripedReaders.size(); + while (reader == null && m < sources.length) { + reader = addStripedReader(m, positionInBlock); + if (getReadLength(liveIndices[m]) > 0) { + if (reader.blockReader == null) { + reader = null; + m++; + } + } else { + used.set(m); + return m; + } + } + + // step2: if there is no new source DN we can use, try to find a source + // DN we ever read from but because some reason, e.g., slow, it + // is not in the success DN list at the begin of this iteration, so + // we have not tried it in this iteration. Now we have a chance to + // revisit it again. + for (int i = 0; reader == null && i < stripedReaders.size(); i++) { + if (!used.get(i)) { + StripedReader r = stripedReaders.get(i); + if (getReadLength(liveIndices[i]) > 0) { + closeBlockReader(r.blockReader); + r.blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), positionInBlock, + sources[i]); + if (r.blockReader != null) { + m = i; + reader = r; + } + } else { + used.set(i); + r.buffer.position(0); + return i; + } + } + } + + // step3: schedule if find a correct source DN and need to do real read. + if (reader != null) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, m); + used.set(m); + } + + return -1; + } + + // cancel all reads. + private void cancelReads(Collection<Future<Void>> futures) { + for (Future<Void> future : futures) { + future.cancel(true); + } + } + + private Callable<Void> readFromBlock(final BlockReader reader, + final ByteBuffer buf) { + return new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + actualReadFromBlock(reader, buf); + return null; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } + } + + }; + } + + /** + * Read bytes from block + */ + private void actualReadFromBlock(BlockReader reader, ByteBuffer buf) + throws IOException { + int len = buf.remaining(); + int n = 0; + while (n < len) { + int nread = reader.read(buf); + if (nread <= 0) { + break; + } + n += nread; + } + } + + // close block reader + private void closeBlockReader(BlockReader blockReader) { + try { + if (blockReader != null) { + blockReader.close(); + } + } catch (IOException e) { + // ignore + } + } + + private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + private BlockReader newBlockReader(final ExtendedBlock block, + long offsetInBlock, DatanodeInfo dnInfo) { + if (offsetInBlock >= block.getNumBytes()) { + return null; + } + try { + InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); + Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); + /* + * This can be further improved if the replica is local, then we can + * read directly from DN and need to check the replica is FINALIZED + * state, notice we should not use short-circuit local read which + * requires config for domain-socket in UNIX or legacy config in Windows. ++ * ++ * TODO: add proper tracer + */ + return RemoteBlockReader2.newBlockReader( + "dummy", block, blockToken, offsetInBlock, + block.getNumBytes() - offsetInBlock, true, + "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, - null, cachingStrategy); ++ null, cachingStrategy, null); + } catch (IOException e) { + return null; + } + } + + private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + final int socketTimeout = datanode.getDnConf().getSocketTimeout(); + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, socketTimeout); + peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(), + sock, datanode.getDataEncryptionKeyFactoryForBlock(b), + blockToken, datanodeId); + peer.setReadTimeout(socketTimeout); + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(LOG, peer); + IOUtils.closeSocket(sock); + } + } + } + + /** + * Send data to targets + */ + private int transferData2Targets(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + ByteBuffer buffer = targetBuffers[i]; + + if (buffer.remaining() == 0) { + continue; + } + + checksum.calculateChunkedSums( + buffer.array(), 0, buffer.remaining(), checksumBuf, 0); + + int ckOff = 0; + while (buffer.remaining() > 0) { + DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false); + int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum; + int toWrite = buffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : buffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize; + packet.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; + packet.writeData(buffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStreams[i]); + + blockOffset4Targets[i] += toWrite; + nsuccess++; + success = true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nsuccess; + } + + /** + * clear all buffers + */ + private void clearBuffers() { + for (StripedReader stripedReader : stripedReaders) { + if (stripedReader.buffer != null) { + stripedReader.buffer.clear(); + } + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i].clear(); + } + } + + for (int i = 0; i < targetBuffers.length; i++) { + if (targetBuffers[i] != null) { + cleanBuffer(targetBuffers[i]); + } + } + } + + private ByteBuffer cleanBuffer(ByteBuffer buffer) { + Arrays.fill(buffer.array(), (byte) 0); + return (ByteBuffer)buffer.clear(); + } + + // send an empty packet to mark the end of the block + private void endTargetBlocks(boolean[] targetsStatus) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + DFSPacket packet = new DFSPacket(packetBuf, 0, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true); + packet.writeTo(targetOutputStreams[i]); + targetOutputStreams[i].flush(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + private int initTargetStreams(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + Socket socket = null; + DataOutputStream out = null; + DataInputStream in = null; + boolean success = false; + try { + InetSocketAddress targetAddr = + getSocketAddress4Transfer(targets[i]); + socket = datanode.newSocket(); + NetUtils.connect(socket, targetAddr, + datanode.getDnConf().getSocketTimeout()); + socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); + + ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); + Token<BlockTokenIdentifier> blockToken = + datanode.getBlockAccessToken(block, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); + OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(socket); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend( + socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); + + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtil.getSmallBufferSize(conf))); ++ DFSUtilClient.getSmallBufferSize(conf))); + in = new DataInputStream(unbufIn); + + DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); + new Sender(out).writeBlock(block, targetStorageTypes[i], + blockToken, "", new DatanodeInfo[]{targets[i]}, + new StorageType[]{targetStorageTypes[i]}, source, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, + checksum, cachingStrategy, false, false, null); + + targetSockets[i] = socket; + targetOutputStreams[i] = out; + targetInputStreams[i] = in; + nsuccess++; + success = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } finally { + if (!success) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeStream(socket); + } + } + targetsStatus[i] = success; + } + return nsuccess; + } + } + + private static class StripedReader { + private final short index; // internal block index + private BlockReader blockReader; + private ByteBuffer buffer; + + private StripedReader(short index) { + this.index = index; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 3e001d3,98af592..67c6fc1 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@@ -180,9 -178,8 +180,10 @@@ class FSDirStatAndListingOp } final FileEncryptionInfo feInfo = isReservedName ? null - : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); + : FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, inode, - iip.getPathSnapshotId(), iip); ++ iip.getPathSnapshotId(), iip); + final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp. + getErasureCodingPolicy(fsd.getFSNamesystem(), iip); final LocatedBlocks blocks = bm.createLocatedBlocks( inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset, @@@ -443,12 -440,9 +444,12 @@@ long blocksize = 0; final boolean isEncrypted; - final FileEncryptionInfo feInfo = isRawPath ? null : - fsd.getFileEncryptionInfo(node, snapshot, iip); + final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp + .getFileEncryptionInfo(fsd, node, snapshot, iip); + final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( + fsd.getFSNamesystem(), iip); + if (node.isFile()) { final INodeFile fileNode = node.asFile(); size = fileNode.computeFileSize(snapshot); @@@ -500,10 -495,8 +503,10 @@@ long blocksize = 0; LocatedBlocks loc = null; final boolean isEncrypted; - final FileEncryptionInfo feInfo = isRawPath ? null : - fsd.getFileEncryptionInfo(node, snapshot, iip); + final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp + .getFileEncryptionInfo(fsd, node, snapshot, iip); + final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( + fsd.getFSNamesystem(), iip); if (node.isFile()) { final INodeFile fileNode = node.asFile(); size = fileNode.computeFileSize(snapshot); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b6b151c,4dda27d..a94b61c --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@@ -131,11 -131,9 +131,10 @@@ import org.apache.commons.logging.impl. import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; - import org.apache.hadoop.crypto.key.KeyProvider; + import org.apache.hadoop.crypto.key.KeyProvider.Metadata; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4143964,79a3773..e9363b4 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@@ -2057,18 -2043,6 +2061,18 @@@ class NameNodeRpcServer implements Name public void removeSpanReceiver(long id) throws IOException { checkNNStartup(); namesystem.checkSuperuserPrivilege(); - nn.spanReceiverHost.removeSpanReceiver(id); + nn.tracerConfigurationManager.removeSpanReceiver(id); } + + @Override // ClientProtocol + public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException { + checkNNStartup(); + return namesystem.getErasureCodingPolicies(); + } + + @Override // ClientProtocol + public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException { + checkNNStartup(); + return namesystem.getErasureCodingPolicy(src); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 9d24b91,727259f..6dd7b89 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@@ -33,7 -33,7 +33,8 @@@ package hadoop.hdfs.datanode import "HAServiceProtocol.proto"; import "hdfs.proto"; +import "erasurecoding.proto"; + import "HdfsServer.proto"; /** * Information to identify a datanode to a namenode http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto index 0000000,3b60e51..66b2a33 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto @@@ -1,0 -1,201 +1,198 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /** + * These .proto interfaces are private and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + + // This file contains protocol buffers that are used throughout HDFS -- i.e. + // by the client, server, and data transfer protocols. + + + option java_package = "org.apache.hadoop.hdfs.protocol.proto"; + option java_outer_classname = "HdfsServerProtos"; + option java_generate_equals_and_hash = true; + package hadoop.hdfs; + + import "hdfs.proto"; + + /** - * A list of storage IDs. - */ -message StorageUuidsProto { - repeated string storageUuids = 1; -} - -/** + * Block access token information + */ + message BlockKeyProto { + required uint32 keyId = 1; // Key identifier + required uint64 expiryDate = 2; // Expiry time in milliseconds + optional bytes keyBytes = 3; // Key secret + } + + /** + * Current key and set of block keys at the namenode. + */ + message ExportedBlockKeysProto { + required bool isBlockTokenEnabled = 1; + required uint64 keyUpdateInterval = 2; + required uint64 tokenLifeTime = 3; + required BlockKeyProto currentKey = 4; + repeated BlockKeyProto allKeys = 5; + } + + /** + * Block and datanodes where is it located + */ + message BlockWithLocationsProto { + required BlockProto block = 1; // Block + repeated string datanodeUuids = 2; // Datanodes with replicas of the block + repeated string storageUuids = 3; // Storages with replicas of the block + repeated StorageTypeProto storageTypes = 4; ++ ++ optional bytes indices = 5; ++ optional uint32 dataBlockNum = 6; ++ optional uint32 cellSize = 7; + } + + /** + * List of block with locations + */ + message BlocksWithLocationsProto { + repeated BlockWithLocationsProto blocks = 1; + } + + /** + * Editlog information with available transactions + */ + message RemoteEditLogProto { + required uint64 startTxId = 1; // Starting available edit log transaction + required uint64 endTxId = 2; // Ending available edit log transaction + optional bool isInProgress = 3 [default = false]; + } + + /** + * Enumeration of editlogs available on a remote namenode + */ + message RemoteEditLogManifestProto { + repeated RemoteEditLogProto logs = 1; + } + + /** + * Namespace information that describes namespace on a namenode + */ + message NamespaceInfoProto { + required string buildVersion = 1; // Software revision version (e.g. an svn or git revision) + required uint32 unused = 2; // Retained for backward compatibility + required string blockPoolID = 3; // block pool used by the namespace + required StorageInfoProto storageInfo = 4;// Node information + required string softwareVersion = 5; // Software version number (e.g. 2.0.0) + optional uint64 capabilities = 6 [default = 0]; // feature flags + } + + /** + * State of a block replica at a datanode + */ + enum ReplicaStateProto { + FINALIZED = 0; // State of a replica when it is not modified + RBW = 1; // State of replica that is being written to + RWR = 2; // State of replica that is waiting to be recovered + RUR = 3; // State of replica that is under recovery + TEMPORARY = 4; // State of replica that is created for replication + } + + /** + * Block that needs to be recovered with at a given location + */ + message RecoveringBlockProto { + required uint64 newGenStamp = 1; // New genstamp post recovery + required LocatedBlockProto block = 2; // Block to be recovered + optional BlockProto truncateBlock = 3; // New block for recovery (truncate) + } + + /** + * Unique signature to identify checkpoint transactions. + */ + message CheckpointSignatureProto { + required string blockPoolId = 1; + required uint64 mostRecentCheckpointTxId = 2; + required uint64 curSegmentTxId = 3; + required StorageInfoProto storageInfo = 4; + } + + /** + * Command returned from primary to checkpointing namenode. + * This command has checkpoint signature that identifies + * checkpoint transaction and is needed for further + * communication related to checkpointing. + */ + message CheckpointCommandProto { + // Unique signature to identify checkpoint transation + required CheckpointSignatureProto signature = 1; + + // If true, return transfer image to primary upon the completion of checkpoint + required bool needToReturnImage = 2; + } + + /** + * Command sent from one namenode to another namenode. + */ + message NamenodeCommandProto { + enum Type { + NamenodeCommand = 0; // Base command + CheckPointCommand = 1; // Check point command + } + required uint32 action = 1; + required Type type = 2; + optional CheckpointCommandProto checkpointCmd = 3; + } + + /** + * void request + */ + message VersionRequestProto { + } + + /** + * Version response from namenode. + */ + message VersionResponseProto { + required NamespaceInfoProto info = 1; + } + + /** + * Common node information shared by all the nodes in the cluster + */ + message StorageInfoProto { + required uint32 layoutVersion = 1; // Layout version of the file system + required uint32 namespceID = 2; // File system namespace ID + required string clusterID = 3; // ID of the cluster + required uint64 cTime = 4; // File system creation time + } + + /** + * Information sent by a namenode to identify itself to the primary namenode. + */ + message NamenodeRegistrationProto { + required string rpcAddress = 1; // host:port of the namenode RPC address + required string httpAddress = 2; // host:port of the namenode http server + enum NamenodeRoleProto { + NAMENODE = 1; + BACKUP = 2; + CHECKPOINT = 3; + } + required StorageInfoProto storageInfo = 3; // Node information + optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index d1b16b1,50d548a..ce7aee3 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@@ -73,21 -74,17 +73,21 @@@ public class TestBlockTokenWithDFS private static final String FILE_TO_READ = "/fileToRead.dat"; private static final String FILE_TO_WRITE = "/fileToWrite.dat"; private static final String FILE_TO_APPEND = "/fileToAppend.dat"; - private final byte[] rawData = new byte[FILE_SIZE]; { - ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); + } + + public static byte[] generateBytes(int fileSize){ Random r = new Random(); + byte[] rawData = new byte[fileSize]; r.nextBytes(rawData); + return rawData; } - private void createFile(FileSystem fs, Path filename) throws IOException { + private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException { FSDataOutputStream out = fs.create(filename); - out.write(rawData); + out.write(expected); out.close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 917b0f2,df07a62..2bb3d5f --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@@ -55,9 -46,8 +54,10 @@@ import org.apache.hadoop.hdfs.server.bl import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java index 5efc94d,a84ddd0..6df88fd --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java @@@ -1660,60 -1657,4 +1662,60 @@@ public class TestFsck } } } + + @Test + public void testECFsck() throws Exception { + MiniDFSCluster cluster = null; + FileSystem fs = null; + try { + Configuration conf = new HdfsConfiguration(); + final long precision = 1L; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + int totalSize = ErasureCodingPolicyManager.getSystemDefaultPolicy().getNumDataUnits() + + ErasureCodingPolicyManager.getSystemDefaultPolicy().getNumParityUnits(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(totalSize).build(); + fs = cluster.getFileSystem(); + + // create a contiguous file + Path replDirPath = new Path("/replicated"); + Path replFilePath = new Path(replDirPath, "replfile"); + final short factor = 3; + DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0); + DFSTestUtil.waitReplication(fs, replFilePath, factor); + + // create a large striped file + Path ecDirPath = new Path("/striped"); + Path largeFilePath = new Path(ecDirPath, "largeFile"); + DFSTestUtil.createStripedFile(cluster, largeFilePath, ecDirPath, 1, 2, true); + + // create a small striped file + Path smallFilePath = new Path(ecDirPath, "smallFile"); + DFSTestUtil.writeFile(fs, smallFilePath, "hello world!"); + + long replTime = fs.getFileStatus(replFilePath).getAccessTime(); + long ecTime = fs.getFileStatus(largeFilePath).getAccessTime(); + Thread.sleep(precision); + setupAuditLogs(); + String outStr = runFsck(conf, 0, true, "/"); + verifyAuditLogs(); + assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime()); + assertEquals(ecTime, fs.getFileStatus(largeFilePath).getAccessTime()); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + if (fs != null) {try{fs.close();} catch(Exception e){}} + cluster.shutdown(); + + // restart the cluster; bring up namenode but not the data nodes + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0).format(false).build(); + outStr = runFsck(conf, 1, true, "/"); + // expect the result is corrupt + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + System.out.println(outStr); + } finally { + if (fs != null) {try{fs.close();} catch(Exception e){}} + if (cluster != null) { cluster.shutdown(); } + } + } - } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java ----------------------------------------------------------------------
