HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d324164a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d324164a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d324164a Branch: refs/heads/HDFS-7285 Commit: d324164a51a43d72c02567248bd9f0f12b244a40 Parents: f446669 Author: Kihwal Lee <kih...@apache.org> Authored: Fri Mar 13 14:13:55 2015 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Fri Mar 13 14:23:37 2015 -0500 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/BlockListAsLongs.java | 660 +++++++++++-------- .../DatanodeProtocolClientSideTranslatorPB.java | 22 +- .../DatanodeProtocolServerSideTranslatorPB.java | 14 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 +- .../server/blockmanagement/BlockManager.java | 16 +- .../hdfs/server/datanode/BPServiceActor.java | 13 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 20 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 2 +- .../server/protocol/DatanodeRegistration.java | 9 + .../hdfs/server/protocol/NamespaceInfo.java | 52 ++ .../server/protocol/StorageBlockReport.java | 8 +- .../src/main/proto/DatanodeProtocol.proto | 2 + .../hadoop-hdfs/src/main/proto/hdfs.proto | 1 + .../hdfs/protocol/TestBlockListAsLongs.java | 237 +++++++ .../blockmanagement/TestBlockManager.java | 8 +- .../server/datanode/BlockReportTestBase.java | 27 +- .../server/datanode/SimulatedFSDataset.java | 11 +- .../TestBlockHasMultipleReplicasOnSameDN.java | 9 +- .../datanode/TestDataNodeVolumeFailure.java | 4 +- ...TestDnRespectsBlockReportSplitThreshold.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 2 +- .../server/namenode/NNThroughputBenchmark.java | 23 +- .../hdfs/server/namenode/TestDeadDatanode.java | 3 +- .../hdfs/server/namenode/TestFSImage.java | 2 + .../TestOfflineEditsViewer.java | 9 +- 26 files changed, 811 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 909182b..ac7e096 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -743,6 +743,9 @@ Release 2.7.0 - UNRELEASED HDFS-7491. Add incremental blockreport latency to DN metrics. (Ming Ma via cnauroth) + HDFS-7435. PB encoding of block reports is very inefficient. + (Daryn Sharp via kihwal) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java index 4389714..1c89ee4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java @@ -17,342 +17,458 @@ */ package org.apache.hadoop.hdfs.protocol; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Random; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.Replica; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; -/** - * This class provides an interface for accessing list of blocks that - * has been implemented as long[]. - * This class is useful for block report. Rather than send block reports - * as a Block[] we can send it as a long[]. - * - * The structure of the array is as follows: - * 0: the length of the finalized replica list; - * 1: the length of the under-construction replica list; - * - followed by finalized replica list where each replica is represented by - * 3 longs: one for the blockId, one for the block length, and one for - * the generation stamp; - * - followed by the invalid replica represented with three -1s; - * - followed by the under-construction replica list where each replica is - * represented by 4 longs: three for the block id, length, generation - * stamp, and the fourth for the replica state. - */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class BlockListAsLongs implements Iterable<Block> { +public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> { + private final static int CHUNK_SIZE = 64*1024; // 64K + private static long[] EMPTY_LONGS = new long[]{0, 0}; + + public static BlockListAsLongs EMPTY = new BlockListAsLongs() { + @Override + public int getNumberOfBlocks() { + return 0; + } + @Override + public ByteString getBlocksBuffer() { + return ByteString.EMPTY; + } + @Override + public long[] getBlockListAsLongs() { + return EMPTY_LONGS; + } + @Override + public Iterator<BlockReportReplica> iterator() { + return Collections.emptyIterator(); + } + }; + /** - * A finalized block as 3 longs - * block-id and block length and generation stamp + * Prepare an instance to in-place decode the given ByteString buffer + * @param numBlocks - blocks in the buffer + * @param blocksBuf - ByteString encoded varints + * @return BlockListAsLongs */ - private static final int LONGS_PER_FINALIZED_BLOCK = 3; + public static BlockListAsLongs decodeBuffer(final int numBlocks, + final ByteString blocksBuf) { + return new BufferDecoder(numBlocks, blocksBuf); + } /** - * An under-construction block as 4 longs - * block-id and block length, generation stamp and replica state + * Prepare an instance to in-place decode the given ByteString buffers + * @param numBlocks - blocks in the buffers + * @param blocksBufs - list of ByteString encoded varints + * @return BlockListAsLongs */ - private static final int LONGS_PER_UC_BLOCK = 4; - - /** Number of longs in the header */ - private static final int HEADER_SIZE = 2; + public static BlockListAsLongs decodeBuffers(final int numBlocks, + final List<ByteString> blocksBufs) { + // this doesn't actually copy the data + return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs)); + } /** - * Returns the index of the first long in blockList - * belonging to the specified block. - * The first long contains the block id. + * Prepare an instance to in-place decode the given list of Longs. Note + * it's much more efficient to decode ByteString buffers and only exists + * for compatibility. + * @param blocksList - list of longs + * @return BlockListAsLongs */ - private int index2BlockId(int blockIndex) { - if(blockIndex < 0 || blockIndex > getNumberOfBlocks()) - return -1; - int finalizedSize = getNumberOfFinalizedReplicas(); - if(blockIndex < finalizedSize) - return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK; - return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK - + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK; + public static BlockListAsLongs decodeLongs(List<Long> blocksList) { + return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList); } - private final long[] blockList; - /** - * Create block report from finalized and under construction lists of blocks. - * - * @param finalized - list of finalized blocks - * @param uc - list of under construction blocks + * Prepare an instance to encode the collection of replicas into an + * efficient ByteString. + * @param replicas - replicas to encode + * @return BlockListAsLongs */ - public BlockListAsLongs(final List<? extends Replica> finalized, - final List<? extends Replica> uc) { - int finalizedSize = finalized == null ? 0 : finalized.size(); - int ucSize = uc == null ? 0 : uc.size(); - int len = HEADER_SIZE - + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK - + ucSize * LONGS_PER_UC_BLOCK; - - blockList = new long[len]; + public static BlockListAsLongs encode( + final Collection<? extends Replica> replicas) { + BlockListAsLongs.Builder builder = builder(); + for (Replica replica : replicas) { + builder.add(replica); + } + return builder.build(); + } - // set the header - blockList[0] = finalizedSize; - blockList[1] = ucSize; + public static Builder builder() { + return new BlockListAsLongs.Builder(); + } - // set finalized blocks - for (int i = 0; i < finalizedSize; i++) { - setBlock(i, finalized.get(i)); - } + /** + * The number of blocks + * @return - the number of blocks + */ + abstract public int getNumberOfBlocks(); - // set invalid delimiting block - setDelimitingBlock(finalizedSize); + /** + * Very efficient encoding of the block report into a ByteString to avoid + * the overhead of protobuf repeating fields. Primitive repeating fields + * require re-allocs of an ArrayList<Long> and the associated (un)boxing + * overhead which puts pressure on GC. + * + * The structure of the buffer is as follows: + * - each replica is represented by 4 longs: + * blockId, block length, genstamp, replica state + * + * @return ByteString encoded block report + */ + abstract public ByteString getBlocksBuffer(); - // set under construction blocks - for (int i = 0; i < ucSize; i++) { - setBlock(finalizedSize + i, uc.get(i)); + /** + * List of ByteStrings that encode this block report + * + * @return ByteStrings + */ + public List<ByteString> getBlocksBuffers() { + final ByteString blocksBuf = getBlocksBuffer(); + final List<ByteString> buffers; + final int size = blocksBuf.size(); + if (size <= CHUNK_SIZE) { + buffers = Collections.singletonList(blocksBuf); + } else { + buffers = new ArrayList<ByteString>(); + for (int pos=0; pos < size; pos += CHUNK_SIZE) { + // this doesn't actually copy the data + buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size))); + } } + return buffers; } /** - * Create block report from a list of finalized blocks. Used by - * NNThroughputBenchmark. - * - * @param blocks - list of finalized blocks + * Convert block report to old-style list of longs. Only used to + * re-encode the block report when the DN detects an older NN. This is + * inefficient, but in practice a DN is unlikely to be upgraded first + * + * The structure of the array is as follows: + * 0: the length of the finalized replica list; + * 1: the length of the under-construction replica list; + * - followed by finalized replica list where each replica is represented by + * 3 longs: one for the blockId, one for the block length, and one for + * the generation stamp; + * - followed by the invalid replica represented with three -1s; + * - followed by the under-construction replica list where each replica is + * represented by 4 longs: three for the block id, length, generation + * stamp, and the fourth for the replica state. + * @return list of longs */ - public BlockListAsLongs(final List<? extends Block> blocks) { - int finalizedSize = blocks == null ? 0 : blocks.size(); - int len = HEADER_SIZE - + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK; + abstract public long[] getBlockListAsLongs(); - blockList = new long[len]; + /** + * Returns a singleton iterator over blocks in the block report. Do not + * add the returned blocks to a collection. + * @return Iterator + */ + abstract public Iterator<BlockReportReplica> iterator(); - // set the header - blockList[0] = finalizedSize; - blockList[1] = 0; + public static class Builder { + private final ByteString.Output out; + private final CodedOutputStream cos; + private int numBlocks = 0; + private int numFinalized = 0; - // set finalized blocks - for (int i = 0; i < finalizedSize; i++) { - setBlock(i, blocks.get(i)); + Builder() { + out = ByteString.newOutput(64*1024); + cos = CodedOutputStream.newInstance(out); } - // set invalid delimiting block - setDelimitingBlock(finalizedSize); - } - - public BlockListAsLongs() { - this((long[])null); - } + public void add(Replica replica) { + try { + // zig-zag to reduce size of legacy blocks + cos.writeSInt64NoTag(replica.getBlockId()); + cos.writeRawVarint64(replica.getBytesOnDisk()); + cos.writeRawVarint64(replica.getGenerationStamp()); + ReplicaState state = replica.getState(); + // although state is not a 64-bit value, using a long varint to + // allow for future use of the upper bits + cos.writeRawVarint64(state.getValue()); + if (state == ReplicaState.FINALIZED) { + numFinalized++; + } + numBlocks++; + } catch (IOException ioe) { + // shouldn't happen, ByteString.Output doesn't throw IOE + throw new IllegalStateException(ioe); + } + } - /** - * Constructor - * @param iBlockList - BlockListALongs create from this long[] parameter - */ - public BlockListAsLongs(final long[] iBlockList) { - if (iBlockList == null) { - blockList = new long[HEADER_SIZE]; - return; + public int getNumberOfBlocks() { + return numBlocks; + } + + public BlockListAsLongs build() { + try { + cos.flush(); + } catch (IOException ioe) { + // shouldn't happen, ByteString.Output doesn't throw IOE + throw new IllegalStateException(ioe); + } + return new BufferDecoder(numBlocks, numFinalized, out.toByteString()); } - blockList = iBlockList; } - public long[] getBlockListAsLongs() { - return blockList; - } + // decode new-style ByteString buffer based block report + private static class BufferDecoder extends BlockListAsLongs { + // reserve upper bits for future use. decoding masks off these bits to + // allow compatibility for the current through future release that may + // start using the bits + private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48); + private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4); - /** - * Iterates over blocks in the block report. - * Avoids object allocation on each iteration. - */ - @InterfaceAudience.Private - @InterfaceStability.Evolving - public class BlockReportIterator implements Iterator<Block> { - private int currentBlockIndex; - private final Block block; - private ReplicaState currentReplicaState; - - BlockReportIterator() { - this.currentBlockIndex = 0; - this.block = new Block(); - this.currentReplicaState = null; + private final ByteString buffer; + private final int numBlocks; + private int numFinalized; + + BufferDecoder(final int numBlocks, final ByteString buf) { + this(numBlocks, -1, buf); } - @Override - public boolean hasNext() { - return currentBlockIndex < getNumberOfBlocks(); + BufferDecoder(final int numBlocks, final int numFinalized, + final ByteString buf) { + this.numBlocks = numBlocks; + this.numFinalized = numFinalized; + this.buffer = buf; } @Override - public Block next() { - block.set(blockId(currentBlockIndex), - blockLength(currentBlockIndex), - blockGenerationStamp(currentBlockIndex)); - currentReplicaState = blockReplicaState(currentBlockIndex); - currentBlockIndex++; - return block; + public int getNumberOfBlocks() { + return numBlocks; } @Override - public void remove() { - throw new UnsupportedOperationException("Sorry. can't remove."); + public ByteString getBlocksBuffer() { + return buffer; } - /** - * Get the state of the current replica. - * The state corresponds to the replica returned - * by the latest {@link #next()}. - */ - public ReplicaState getCurrentReplicaState() { - return currentReplicaState; + @Override + public long[] getBlockListAsLongs() { + // terribly inefficient but only occurs if server tries to transcode + // an undecoded buffer into longs - ie. it will never happen but let's + // handle it anyway + if (numFinalized == -1) { + int n = 0; + for (Replica replica : this) { + if (replica.getState() == ReplicaState.FINALIZED) { + n++; + } + } + numFinalized = n; + } + int numUc = numBlocks - numFinalized; + int size = 2 + 3*(numFinalized+1) + 4*(numUc); + long[] longs = new long[size]; + longs[0] = numFinalized; + longs[1] = numUc; + + int idx = 2; + int ucIdx = idx + 3*numFinalized; + // delimiter block + longs[ucIdx++] = -1; + longs[ucIdx++] = -1; + longs[ucIdx++] = -1; + + for (BlockReportReplica block : this) { + switch (block.getState()) { + case FINALIZED: { + longs[idx++] = block.getBlockId(); + longs[idx++] = block.getNumBytes(); + longs[idx++] = block.getGenerationStamp(); + break; + } + default: { + longs[ucIdx++] = block.getBlockId(); + longs[ucIdx++] = block.getNumBytes(); + longs[ucIdx++] = block.getGenerationStamp(); + longs[ucIdx++] = block.getState().getValue(); + break; + } + } + } + return longs; } - } - /** - * Returns an iterator over blocks in the block report. - */ - @Override - public Iterator<Block> iterator() { - return getBlockReportIterator(); - } - - /** - * Returns {@link BlockReportIterator}. - */ - public BlockReportIterator getBlockReportIterator() { - return new BlockReportIterator(); - } - - /** - * The number of blocks - * @return - the number of blocks - */ - public int getNumberOfBlocks() { - assert blockList.length == HEADER_SIZE + - (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK + - blockList[1] * LONGS_PER_UC_BLOCK : - "Number of blocks is inconcistent with the array length"; - return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas(); - } - - /** - * Returns the number of finalized replicas in the block report. - */ - private int getNumberOfFinalizedReplicas() { - return (int)blockList[0]; - } - - /** - * Returns the number of under construction replicas in the block report. - */ - private int getNumberOfUCReplicas() { - return (int)blockList[1]; + @Override + public Iterator<BlockReportReplica> iterator() { + return new Iterator<BlockReportReplica>() { + final BlockReportReplica block = new BlockReportReplica(); + final CodedInputStream cis = buffer.newCodedInput(); + private int currentBlockIndex = 0; + + @Override + public boolean hasNext() { + return currentBlockIndex < numBlocks; + } + + @Override + public BlockReportReplica next() { + currentBlockIndex++; + try { + // zig-zag to reduce size of legacy blocks and mask off bits + // we don't (yet) understand + block.setBlockId(cis.readSInt64()); + block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK); + block.setGenerationStamp(cis.readRawVarint64()); + long state = cis.readRawVarint64() & REPLICA_STATE_MASK; + block.setState(ReplicaState.getState((int)state)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return block; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } - /** - * Returns the id of the specified replica of the block report. - */ - private long blockId(int index) { - return blockList[index2BlockId(index)]; - } + // decode old style block report of longs + private static class LongsDecoder extends BlockListAsLongs { + private final List<Long> values; + private final int finalizedBlocks; + private final int numBlocks; - /** - * Returns the length of the specified replica of the block report. - */ - private long blockLength(int index) { - return blockList[index2BlockId(index) + 1]; - } + // set the header + LongsDecoder(List<Long> values) { + this.values = values.subList(2, values.size()); + this.finalizedBlocks = values.get(0).intValue(); + this.numBlocks = finalizedBlocks + values.get(1).intValue(); + } - /** - * Returns the generation stamp of the specified replica of the block report. - */ - private long blockGenerationStamp(int index) { - return blockList[index2BlockId(index) + 2]; - } + @Override + public int getNumberOfBlocks() { + return numBlocks; + } - /** - * Returns the state of the specified replica of the block report. - */ - private ReplicaState blockReplicaState(int index) { - if(index < getNumberOfFinalizedReplicas()) - return ReplicaState.FINALIZED; - return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]); - } + @Override + public ByteString getBlocksBuffer() { + Builder builder = builder(); + for (Replica replica : this) { + builder.add(replica); + } + return builder.build().getBlocksBuffer(); + } - /** - * Corrupt the generation stamp of the block with the given index. - * Not meant to be used outside of tests. - */ - @VisibleForTesting - public long corruptBlockGSForTesting(final int blockIndex, Random rand) { - long oldGS = blockList[index2BlockId(blockIndex) + 2]; - while (blockList[index2BlockId(blockIndex) + 2] == oldGS) { - blockList[index2BlockId(blockIndex) + 2] = rand.nextInt(); + @Override + public long[] getBlockListAsLongs() { + long[] longs = new long[2+values.size()]; + longs[0] = finalizedBlocks; + longs[1] = numBlocks - finalizedBlocks; + for (int i=0; i < longs.length; i++) { + longs[i] = values.get(i); + } + return longs; } - return oldGS; - } - /** - * Corrupt the length of the block with the given index by truncation. - * Not meant to be used outside of tests. - */ - @VisibleForTesting - public long corruptBlockLengthForTesting(final int blockIndex, Random rand) { - long oldLength = blockList[index2BlockId(blockIndex) + 1]; - blockList[index2BlockId(blockIndex) + 1] = - rand.nextInt((int) oldLength - 1); - return oldLength; + @Override + public Iterator<BlockReportReplica> iterator() { + return new Iterator<BlockReportReplica>() { + private final BlockReportReplica block = new BlockReportReplica(); + final Iterator<Long> iter = values.iterator(); + private int currentBlockIndex = 0; + + @Override + public boolean hasNext() { + return currentBlockIndex < numBlocks; + } + + @Override + public BlockReportReplica next() { + if (currentBlockIndex == finalizedBlocks) { + // verify the presence of the delimiter block + readBlock(); + Preconditions.checkArgument(block.getBlockId() == -1 && + block.getNumBytes() == -1 && + block.getGenerationStamp() == -1, + "Invalid delimiter block"); + } + + readBlock(); + if (currentBlockIndex++ < finalizedBlocks) { + block.setState(ReplicaState.FINALIZED); + } else { + block.setState(ReplicaState.getState(iter.next().intValue())); + } + return block; + } + + private void readBlock() { + block.setBlockId(iter.next()); + block.setNumBytes(iter.next()); + block.setGenerationStamp(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } - /** - * Set the indexTh block - * @param index - the index of the block to set - * @param r - the block is set to the value of the this Replica - */ - private void setBlock(final int index, final Replica r) { - int pos = index2BlockId(index); - blockList[pos] = r.getBlockId(); - blockList[pos + 1] = r.getNumBytes(); - blockList[pos + 2] = r.getGenerationStamp(); - if(index < getNumberOfFinalizedReplicas()) - return; - assert r.getState() != ReplicaState.FINALIZED : - "Must be under-construction replica."; - blockList[pos + 3] = r.getState().getValue(); - } - - /** - * Set the indexTh block - * @param index - the index of the block to set - * @param b - the block is set to the value of the this Block - */ - private void setBlock(final int index, final Block b) { - int pos = index2BlockId(index); - blockList[pos] = b.getBlockId(); - blockList[pos + 1] = b.getNumBytes(); - blockList[pos + 2] = b.getGenerationStamp(); - } - - /** - * Set the invalid delimiting block between the finalized and - * the under-construction lists. - * The invalid block has all three fields set to -1. - * @param finalizedSzie - the size of the finalized list - */ - private void setDelimitingBlock(final int finalizedSzie) { - int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK; - blockList[idx] = -1; - blockList[idx+1] = -1; - blockList[idx+2] = -1; - } - - public long getMaxGsInBlockList() { - long maxGs = -1; - Iterator<Block> iter = getBlockReportIterator(); - while (iter.hasNext()) { - Block b = iter.next(); - if (b.getGenerationStamp() > maxGs) { - maxGs = b.getGenerationStamp(); + @InterfaceAudience.Private + public static class BlockReportReplica extends Block implements Replica { + private ReplicaState state; + private BlockReportReplica() { + } + public BlockReportReplica(Block block) { + super(block); + if (block instanceof BlockReportReplica) { + this.state = ((BlockReportReplica)block).getState(); + } else { + this.state = ReplicaState.FINALIZED; } } - return maxGs; + public void setState(ReplicaState state) { + this.state = state; + } + @Override + public ReplicaState getState() { + return state; + } + @Override + public long getBytesOnDisk() { + return getNumBytes(); + } + @Override + public long getVisibleLength() { + throw new UnsupportedOperationException(); + } + @Override + public String getStorageUuid() { + throw new UnsupportedOperationException(); + } + @Override + public boolean isOnTransientStorage() { + throw new UnsupportedOperationException(); + } + @Override + public boolean equals(Object o) { + return super.equals(o); + } + @Override + public int hashCode() { + return super.hashCode(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 192916f..c4003f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -64,6 +66,7 @@ import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -83,6 +86,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements VersionRequestProto.newBuilder().build(); private final static RpcController NULL_CONTROLLER = null; + @VisibleForTesting + public DatanodeProtocolClientSideTranslatorPB(DatanodeProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, @@ -166,12 +174,20 @@ public class DatanodeProtocolClientSideTranslatorPB implements .newBuilder().setRegistration(PBHelper.convert(registration)) .setBlockPoolId(poolId); + boolean useBlocksBuffer = registration.getNamespaceInfo() + .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS); + for (StorageBlockReport r : reports) { StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto .newBuilder().setStorage(PBHelper.convert(r.getStorage())); - long[] blocks = r.getBlocks(); - for (int i = 0; i < blocks.length; i++) { - reportBuilder.addBlocks(blocks[i]); + BlockListAsLongs blocks = r.getBlocks(); + if (useBlocksBuffer) { + reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks()); + reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers()); + } else { + for (long value : blocks.getBlockListAsLongs()) { + reportBuilder.addBlocks(value); + } } builder.addReports(reportBuilder.build()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 1a89090..e18081f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; import java.util.List; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; @@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -145,10 +147,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements int index = 0; for (StorageBlockReportProto s : request.getReportsList()) { - List<Long> blockIds = s.getBlocksList(); - long[] blocks = new long[blockIds.size()]; - for (int i = 0; i < blockIds.size(); i++) { - blocks[i] = blockIds.get(i); + final BlockListAsLongs blocks; + if (s.hasNumberOfBlocks()) { // new style buffer based reports + int num = (int)s.getNumberOfBlocks(); + Preconditions.checkState(s.getBlocksCount() == 0, + "cannot send both blocks list and buffers"); + blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList()); + } else { + blocks = BlockListAsLongs.decodeLongs(s.getBlocksList()); } report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()), blocks); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ee1603c..c428c2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -573,7 +573,7 @@ public class PBHelper { StorageInfoProto storage = info.getStorageInfo(); return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(), info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(), - info.getSoftwareVersion()); + info.getSoftwareVersion(), info.getCapabilities()); } public static NamenodeCommand convert(NamenodeCommandProto cmd) { @@ -1233,7 +1233,9 @@ public class PBHelper { .setBuildVersion(info.getBuildVersion()) .setUnused(0) .setStorageInfo(PBHelper.convert((StorageInfo)info)) - .setSoftwareVersion(info.getSoftwareVersion()).build(); + .setSoftwareVersion(info.getSoftwareVersion()) + .setCapabilities(info.getCapabilities()) + .build(); } // Located Block Arrays and Lists http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c1a3e05..f155375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -1968,11 +1968,9 @@ public class BlockManager { if (report == null) return; assert (namesystem.hasWriteLock()); assert (storageInfo.numBlocks() == 0); - BlockReportIterator itBR = report.getBlockReportIterator(); - while(itBR.hasNext()) { - Block iblk = itBR.next(); - ReplicaState reportedState = itBR.getCurrentReplicaState(); + for (BlockReportReplica iblk : report) { + ReplicaState reportedState = iblk.getState(); if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { @@ -2042,13 +2040,11 @@ public class BlockManager { int curIndex; if (newReport == null) { - newReport = new BlockListAsLongs(); + newReport = BlockListAsLongs.EMPTY; } // scan the report and process newly reported blocks - BlockReportIterator itBR = newReport.getBlockReportIterator(); - while(itBR.hasNext()) { - Block iblk = itBR.next(); - ReplicaState iState = itBR.getCurrentReplicaState(); + for (BlockReportReplica iblk : newReport) { + ReplicaState iState = iblk.getState(); BlockInfoContiguous storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 3ba2f54..3c20f6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -228,7 +228,7 @@ class BPServiceActor implements Runnable { bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. - register(); + register(nsInfo); } // This is useful to make sure NN gets Heartbeat before Blockreport @@ -468,8 +468,7 @@ class BPServiceActor implements Runnable { for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { BlockListAsLongs blockList = kvPair.getValue(); - reports[i++] = new StorageBlockReport( - kvPair.getKey(), blockList.getBlockListAsLongs()); + reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList); totalBlockCount += blockList.getNumberOfBlocks(); } @@ -774,10 +773,11 @@ class BPServiceActor implements Runnable { * * issued by the namenode to recognize registered datanodes. * + * @param nsInfo current NamespaceInfo * @see FSNamesystem#registerDatanode(DatanodeRegistration) * @throws IOException */ - void register() throws IOException { + void register(NamespaceInfo nsInfo) throws IOException { // The handshake() phase loaded the block pool storage // off disk - so update the bpRegistration object from that info bpRegistration = bpos.createRegistration(); @@ -788,6 +788,7 @@ class BPServiceActor implements Runnable { try { // Use returned registration from namenode with updated fields bpRegistration = bpNamenode.registerDatanode(bpRegistration); + bpRegistration.setNamespaceInfo(nsInfo); break; } catch(EOFException e) { // namenode might have just restarted LOG.info("Problem connecting to server: " + nnAddr + " :" @@ -915,9 +916,9 @@ class BPServiceActor implements Runnable { if (shouldRun()) { // re-retrieve namespace info to make sure that, if the NN // was restarted, we still match its version (HDFS-2120) - retrieveNamespaceInfo(); + NamespaceInfo nsInfo = retrieveNamespaceInfo(); // and re-register - register(); + register(nsInfo); scheduleHeartbeat(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 486acbc..d42c00c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1575,30 +1575,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { Map<DatanodeStorage, BlockListAsLongs> blockReportsMap = new HashMap<DatanodeStorage, BlockListAsLongs>(); - Map<String, ArrayList<ReplicaInfo>> finalized = - new HashMap<String, ArrayList<ReplicaInfo>>(); - Map<String, ArrayList<ReplicaInfo>> uc = - new HashMap<String, ArrayList<ReplicaInfo>>(); + Map<String, BlockListAsLongs.Builder> builders = + new HashMap<String, BlockListAsLongs.Builder>(); List<FsVolumeImpl> curVolumes = getVolumes(); for (FsVolumeSpi v : curVolumes) { - finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>()); - uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>()); + builders.put(v.getStorageID(), BlockListAsLongs.builder()); } synchronized(this) { for (ReplicaInfo b : volumeMap.replicas(bpid)) { switch(b.getState()) { case FINALIZED: - finalized.get(b.getVolume().getStorageID()).add(b); - break; case RBW: case RWR: - uc.get(b.getVolume().getStorageID()).add(b); + builders.get(b.getVolume().getStorageID()).add(b); break; case RUR: ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b; - uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica()); + builders.get(rur.getVolume().getStorageID()) + .add(rur.getOriginalReplica()); break; case TEMPORARY: break; @@ -1609,10 +1605,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } for (FsVolumeImpl v : curVolumes) { - ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID()); - ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID()); blockReportsMap.put(v.toDatanodeStorage(), - new BlockListAsLongs(finalizedList, ucList)); + builders.get(v.getStorageID()).build()); } return blockReportsMap; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index f20fb35..059bd28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1302,7 +1302,7 @@ class NameNodeRpcServer implements NamenodeProtocols { final BlockManager bm = namesystem.getBlockManager(); boolean noStaleStorages = false; for(StorageBlockReport r : reports) { - final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks()); + final BlockListAsLongs blocks = r.getBlocks(); // // BlockManager.processReport accumulates information of prior calls // for the same node and storage, so the value returned by the last http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java index 9db2fca..e788137 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java @@ -40,6 +40,7 @@ public class DatanodeRegistration extends DatanodeID private final StorageInfo storageInfo; private ExportedBlockKeys exportedKeys; private final String softwareVersion; + private NamespaceInfo nsInfo; @VisibleForTesting public DatanodeRegistration(String uuid, DatanodeRegistration dnr) { @@ -77,6 +78,14 @@ public class DatanodeRegistration extends DatanodeID public int getVersion() { return storageInfo.getLayoutVersion(); } + + public void setNamespaceInfo(NamespaceInfo nsInfo) { + this.nsInfo = nsInfo; + } + + public NamespaceInfo getNamespaceInfo() { + return nsInfo; + } @Override // NodeRegistration public String getRegistrationID() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java index 0733743..a7439a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.util.VersionInfo; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * NamespaceInfo is returned by the name-node in reply * to a data-node handshake. @@ -40,19 +43,52 @@ public class NamespaceInfo extends StorageInfo { final String buildVersion; String blockPoolID = ""; // id of the block pool String softwareVersion; + long capabilities; + + // only authoritative on the server-side to determine advertisement to + // clients. enum will update the supported values + private static long CAPABILITIES_SUPPORTED = 0; + + public enum Capability { + UNKNOWN(false), + STORAGE_BLOCK_REPORT_BUFFERS(true); // use optimized ByteString buffers + private final long mask; + Capability(boolean isSupported) { + int bits = ordinal() - 1; + mask = (bits < 0) ? 0 : (1L << bits); + if (isSupported) { + CAPABILITIES_SUPPORTED |= mask; + } + } + public long getMask() { + return mask; + } + } + // defaults to enabled capabilites since this ctor is for server public NamespaceInfo() { super(NodeType.NAME_NODE); buildVersion = null; + capabilities = CAPABILITIES_SUPPORTED; } + // defaults to enabled capabilites since this ctor is for server public NamespaceInfo(int nsID, String clusterID, String bpID, long cT, String buildVersion, String softwareVersion) { + this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion, + CAPABILITIES_SUPPORTED); + } + + // for use by server and/or client + public NamespaceInfo(int nsID, String clusterID, String bpID, + long cT, String buildVersion, String softwareVersion, + long capabilities) { super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT, NodeType.NAME_NODE); blockPoolID = bpID; this.buildVersion = buildVersion; this.softwareVersion = softwareVersion; + this.capabilities = capabilities; } public NamespaceInfo(int nsID, String clusterID, String bpID, @@ -61,6 +97,22 @@ public class NamespaceInfo extends StorageInfo { VersionInfo.getVersion()); } + public long getCapabilities() { + return capabilities; + } + + @VisibleForTesting + public void setCapabilities(long capabilities) { + this.capabilities = capabilities; + } + + public boolean isCapabilitySupported(Capability capability) { + Preconditions.checkArgument(capability != Capability.UNKNOWN, + "cannot test for unknown capability"); + long mask = capability.getMask(); + return (capabilities & mask) == mask; + } + public String getBuildVersion() { return buildVersion; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java index 1521693..4ef5ebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java @@ -18,14 +18,16 @@ package org.apache.hadoop.hdfs.server.protocol; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; + /** * Block report for a Datanode storage */ public class StorageBlockReport { private final DatanodeStorage storage; - private final long[] blocks; + private final BlockListAsLongs blocks; - public StorageBlockReport(DatanodeStorage storage, long[] blocks) { + public StorageBlockReport(DatanodeStorage storage, BlockListAsLongs blocks) { this.storage = storage; this.blocks = blocks; } @@ -34,7 +36,7 @@ public class StorageBlockReport { return storage; } - public long[] getBlocks() { + public BlockListAsLongs getBlocks() { return blocks; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 348f346..7b3a4a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -237,6 +237,8 @@ message BlockReportRequestProto { message StorageBlockReportProto { required DatanodeStorageProto storage = 1; // Storage repeated uint64 blocks = 2 [packed=true]; + optional uint64 numberOfBlocks = 3; + repeated bytes blocksBuffers = 4; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 97906b1..31e5585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -517,6 +517,7 @@ message NamespaceInfoProto { required string blockPoolID = 3; // block pool used by the namespace required StorageInfoProto storageInfo = 4;// Node information required string softwareVersion = 5; // Software version number (e.g. 2.0.0) + optional uint64 capabilities = 6 [default = 0]; // feature flags } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java new file mode 100644 index 0000000..bebde18 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.protobuf.ByteString; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class TestBlockListAsLongs { + static Block b1 = new Block(1, 11, 111); + static Block b2 = new Block(2, 22, 222); + static Block b3 = new Block(3, 33, 333); + static Block b4 = new Block(4, 44, 444); + + @Test + public void testEmptyReport() { + BlockListAsLongs blocks = checkReport(); + assertArrayEquals( + new long[] { + 0, 0, + -1, -1, -1 }, + blocks.getBlockListAsLongs()); + } + + @Test + public void testFinalized() { + BlockListAsLongs blocks = checkReport( + new FinalizedReplica(b1, null, null)); + assertArrayEquals( + new long[] { + 1, 0, + 1, 11, 111, + -1, -1, -1 }, + blocks.getBlockListAsLongs()); + } + + @Test + public void testUc() { + BlockListAsLongs blocks = checkReport( + new ReplicaBeingWritten(b1, null, null, null)); + assertArrayEquals( + new long[] { + 0, 1, + -1, -1, -1, + 1, 11, 111, ReplicaState.RBW.getValue() }, + blocks.getBlockListAsLongs()); + } + + @Test + public void testMix() { + BlockListAsLongs blocks = checkReport( + new FinalizedReplica(b1, null, null), + new FinalizedReplica(b2, null, null), + new ReplicaBeingWritten(b3, null, null, null), + new ReplicaWaitingToBeRecovered(b4, null, null)); + assertArrayEquals( + new long[] { + 2, 2, + 1, 11, 111, + 2, 22, 222, + -1, -1, -1, + 3, 33, 333, ReplicaState.RBW.getValue(), + 4, 44, 444, ReplicaState.RWR.getValue() }, + blocks.getBlockListAsLongs()); + } + + @Test + public void testFuzz() throws InterruptedException { + Replica[] replicas = new Replica[100000]; + Random rand = new Random(0); + for (int i=0; i<replicas.length; i++) { + Block b = new Block(rand.nextLong(), i, i<<4); + switch (rand.nextInt(2)) { + case 0: + replicas[i] = new FinalizedReplica(b, null, null); + break; + case 1: + replicas[i] = new ReplicaBeingWritten(b, null, null, null); + break; + case 2: + replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null); + break; + } + } + checkReport(replicas); + } + + private BlockListAsLongs checkReport(Replica...replicas) { + Map<Long, Replica> expectedReplicas = new HashMap<>(); + for (Replica replica : replicas) { + expectedReplicas.put(replica.getBlockId(), replica); + } + expectedReplicas = Collections.unmodifiableMap(expectedReplicas); + + // encode the blocks and extract the buffers + BlockListAsLongs blocks = + BlockListAsLongs.encode(expectedReplicas.values()); + List<ByteString> buffers = blocks.getBlocksBuffers(); + + // convert to old-style list of longs + List<Long> longs = new ArrayList<Long>(); + for (long value : blocks.getBlockListAsLongs()) { + longs.add(value); + } + + // decode the buffers and verify its contents + BlockListAsLongs decodedBlocks = + BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers); + checkReplicas(expectedReplicas, decodedBlocks); + + // decode the long and verify its contents + BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs); + checkReplicas(expectedReplicas, decodedList); + return blocks; + } + + private void checkReplicas(Map<Long,Replica> expectedReplicas, + BlockListAsLongs decodedBlocks) { + assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks()); + + Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas); + for (BlockReportReplica replica : decodedBlocks) { + assertNotNull(replica); + Replica expected = reportReplicas.remove(replica.getBlockId()); + assertNotNull(expected); + assertEquals("wrong bytes", + expected.getNumBytes(), replica.getNumBytes()); + assertEquals("wrong genstamp", + expected.getGenerationStamp(), replica.getGenerationStamp()); + assertEquals("wrong replica state", + expected.getState(), replica.getState()); + } + assertTrue(reportReplicas.isEmpty()); + } + + @Test + public void testDatanodeDetect() throws ServiceException, IOException { + final AtomicReference<BlockReportRequestProto> request = + new AtomicReference<>(); + + // just capture the outgoing PB + DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class); + doAnswer(new Answer<BlockReportResponseProto>() { + public BlockReportResponseProto answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + request.set((BlockReportRequestProto) args[1]); + return BlockReportResponseProto.newBuilder().build(); + } + }).when(mockProxy).blockReport(any(RpcController.class), + any(BlockReportRequestProto.class)); + + @SuppressWarnings("resource") + DatanodeProtocolClientSideTranslatorPB nn = + new DatanodeProtocolClientSideTranslatorPB(mockProxy); + + DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration(); + NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1); + reg.setNamespaceInfo(nsInfo); + + Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null); + BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r)); + DatanodeStorage storage = new DatanodeStorage("s1"); + StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) }; + + // check DN sends new-style BR + request.set(null); + nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); + nn.blockReport(reg, "pool", sbr); + BlockReportRequestProto proto = request.get(); + assertNotNull(proto); + assertTrue(proto.getReports(0).getBlocksList().isEmpty()); + assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty()); + + // back up to prior version and check DN sends old-style BR + request.set(null); + nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); + nn.blockReport(reg, "pool", sbr); + proto = request.get(); + assertNotNull(proto); + assertFalse(proto.getReports(0).getBlocksList().isEmpty()); + assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 6d67c7d..d9ac9e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -555,12 +555,12 @@ public class TestBlockManager { reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - new BlockListAsLongs(null, null)); + BlockListAsLongs.EMPTY); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - new BlockListAsLongs(null, null)); + BlockListAsLongs.EMPTY); assertEquals(1, ds.getBlockReportCount()); // re-register as if node restarted, should update existing node @@ -571,7 +571,7 @@ public class TestBlockManager { // send block report, should be processed after restart reset(node); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - new BlockListAsLongs(null, null)); + BlockListAsLongs.EMPTY); // Reinitialize as registration with empty storage list pruned // node.storageMap. ds = node.getStorageInfos()[0]; @@ -600,7 +600,7 @@ public class TestBlockManager { reset(node); doReturn(1).when(node).numBlocks(); bm.processReport(node, new DatanodeStorage(ds.getStorageID()), - new BlockListAsLongs(null, null)); + BlockListAsLongs.EMPTY); assertEquals(1, ds.getBlockReportCount()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 8d9de7b..37c503c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; @@ -146,22 +147,32 @@ public abstract class BlockReportTestBase { // Walk the list of blocks until we find one each to corrupt the // generation stamp and length, if so requested. - for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) { + BlockListAsLongs.Builder builder = BlockListAsLongs.builder(); + for (BlockReportReplica block : blockList) { if (corruptOneBlockGs && !corruptedGs) { - blockList.corruptBlockGSForTesting(i, rand); - LOG.info("Corrupted the GS for block ID " + i); + long gsOld = block.getGenerationStamp(); + long gsNew; + do { + gsNew = rand.nextInt(); + } while (gsNew == gsOld); + block.setGenerationStamp(gsNew); + LOG.info("Corrupted the GS for block ID " + block); corruptedGs = true; } else if (corruptOneBlockLen && !corruptedLen) { - blockList.corruptBlockLengthForTesting(i, rand); - LOG.info("Corrupted the length for block ID " + i); + long lenOld = block.getNumBytes(); + long lenNew; + do { + lenNew = rand.nextInt((int)lenOld - 1); + } while (lenNew == lenOld); + block.setNumBytes(lenNew); + LOG.info("Corrupted the length for block ID " + block); corruptedLen = true; - } else { - break; } + builder.add(new BlockReportReplica(block)); } reports[reportIndex++] = - new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs()); + new StorageBlockReport(dnStorage, builder.build()); } return reports; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a4ec8d5..5c7b4ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -271,7 +270,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override public ReplicaState getState() { - return null; + return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW; } @Override @@ -529,7 +528,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } public synchronized void injectBlocks(String bpid, - Iterable<Block> injectBlocks) throws IOException { + Iterable<? extends Block> injectBlocks) throws IOException { ExtendedBlock blk = new ExtendedBlock(); if (injectBlocks != null) { for (Block b: injectBlocks) { // if any blocks in list is bad, reject list @@ -582,16 +581,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } synchronized BlockListAsLongs getBlockReport(String bpid) { - final List<Replica> blocks = new ArrayList<Replica>(); + BlockListAsLongs.Builder report = BlockListAsLongs.builder(); final Map<Block, BInfo> map = blockMap.get(bpid); if (map != null) { for (BInfo b : map.values()) { if (b.isFinalized()) { - blocks.add(b); + report.add(b); } } } - return new BlockListAsLongs(blocks, null); + return report.build(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java index 1152c74..3238d6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java @@ -107,17 +107,18 @@ public class TestBlockHasMultipleReplicasOnSameDN { StorageBlockReport reports[] = new StorageBlockReport[cluster.getStoragesPerDatanode()]; - ArrayList<Block> blocks = new ArrayList<Block>(); + ArrayList<Replica> blocks = new ArrayList<Replica>(); for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - blocks.add(locatedBlock.getBlock().getLocalBlock()); + Block localBlock = locatedBlock.getBlock().getLocalBlock(); + blocks.add(new FinalizedReplica(localBlock, null, null)); } + BlockListAsLongs bll = BlockListAsLongs.encode(blocks); for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) { - BlockListAsLongs bll = new BlockListAsLongs(blocks); FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i); DatanodeStorage dns = new DatanodeStorage(v.getStorageID()); - reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs()); + reports[i] = new StorageBlockReport(dns, bll); } // Should not assert! http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index ba786d1..9cbad6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -25,11 +25,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -188,7 +186,7 @@ public class TestDataNodeVolumeFailure { DatanodeStorage dnStorage = kvPair.getKey(); BlockListAsLongs blockList = kvPair.getValue(); reports[reportIndex++] = - new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs()); + new StorageBlockReport(dnStorage, blockList); } cluster.getNameNodeRpc().blockReport(dnR, bpid, reports); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java index 7058d71..a5e4d4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java @@ -98,7 +98,7 @@ public class TestDnRespectsBlockReportSplitThreshold { assertThat(reports.length, is(expectedReportsPerCall)); for (StorageBlockReport report : reports) { - BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks()); + BlockListAsLongs blockList = report.getBlocks(); numBlocksReported += blockList.getNumberOfBlocks(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 5a440c4..0865e11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -195,7 +195,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { final Map<DatanodeStorage, BlockListAsLongs> result = new HashMap<DatanodeStorage, BlockListAsLongs>(); - result.put(storage, new BlockListAsLongs(null, null)); + result.put(storage, BlockListAsLongs.EMPTY); return result; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index c11abfc..bc3c6b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.List; import com.google.common.base.Preconditions; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -891,9 +893,9 @@ public class NNThroughputBenchmark implements Tool { NamespaceInfo nsInfo; DatanodeRegistration dnRegistration; DatanodeStorage storage; //only one storage - final ArrayList<Block> blocks; + final ArrayList<BlockReportReplica> blocks; int nrBlocks; // actual number of blocks - long[] blockReportList; + BlockListAsLongs blockReportList; final int dnIdx; private static int getNodePort(int num) throws IOException { @@ -904,7 +906,7 @@ public class NNThroughputBenchmark implements Tool { TinyDatanode(int dnIdx, int blockCapacity) throws IOException { this.dnIdx = dnIdx; - this.blocks = new ArrayList<Block>(blockCapacity); + this.blocks = new ArrayList<BlockReportReplica>(blockCapacity); this.nrBlocks = 0; } @@ -934,8 +936,7 @@ public class NNThroughputBenchmark implements Tool { //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { - new StorageBlockReport(storage, - new BlockListAsLongs(null, null).getBlockListAsLongs()) + new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; nameNodeProto.blockReport(dnRegistration, nameNode.getNamesystem().getBlockPoolId(), reports); @@ -968,19 +969,21 @@ public class NNThroughputBenchmark implements Tool { } return false; } - blocks.set(nrBlocks, blk); + blocks.set(nrBlocks, new BlockReportReplica(blk)); nrBlocks++; return true; } void formBlockReport() { // fill remaining slots with blocks that do not exist - for(int idx = blocks.size()-1; idx >= nrBlocks; idx--) - blocks.set(idx, new Block(blocks.size() - idx, 0, 0)); - blockReportList = new BlockListAsLongs(blocks).getBlockListAsLongs(); + for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) { + Block block = new Block(blocks.size() - idx, 0, 0); + blocks.set(idx, new BlockReportReplica(block)); + } + blockReportList = BlockListAsLongs.EMPTY; } - long[] getBlockReportList() { + BlockListAsLongs getBlockReportList() { return blockReportList; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index fb1418a..ee80b33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -104,7 +105,7 @@ public class TestDeadDatanode { // Ensure blockReport from dead datanode is rejected with IOException StorageBlockReport[] report = { new StorageBlockReport( new DatanodeStorage(reg.getDatanodeUuid()), - new long[] { 0L, 0L, 0L }) }; + BlockListAsLongs.EMPTY) }; try { dnp.blockReport(reg, poolId, report); fail("Expected IOException is not thrown"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index f7dad18..7b9ea93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -219,6 +220,7 @@ public class TestFSImage { .manageDataDfsDirs(false) .manageNameDfsDirs(false) .waitSafeMode(false) + .startupOption(StartupOption.UPGRADE) .build(); try { FileSystem fs = cluster.getFileSystem(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java index 0e605ac..2ad7b60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; +import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper; import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags; import org.apache.hadoop.test.PathUtils; @@ -140,8 +141,8 @@ public class TestOfflineEditsViewer { assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false)); // judgment time - assertTrue("Test round trip", - filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2)); + assertTrue("Test round trip", FileUtils.contentEqualsIgnoreEOL( + new File(editsParsedXml), new File(editsParsedXml2), "UTF-8")); os.close(); } @@ -238,6 +239,10 @@ public class TestOfflineEditsViewer { ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall)); ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge)); + // OEV outputs with the latest layout version, so tweak the old file's + // contents to have latest version so checkedin binary files don't + // require frequent updates + small.put(3, (byte)NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // now correct if it's otherwise if (small.capacity() > large.capacity()) {