http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java index 7ca6419..ee6f441 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * Represents a block that is currently being constructed.<br> * This is usually the last block of a file opened for write or append. */ -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { +public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous + implements BlockInfoUnderConstruction{ /** Block state. See {@link BlockUCState} */ private BlockUCState blockUCState; @@ -60,101 +61,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { private Block truncateBlock; /** - * ReplicaUnderConstruction contains information about replicas while - * they are under construction. - * The GS, the length and the state of the replica is as reported by - * the data-node. - * It is not guaranteed, but expected, that data-nodes actually have - * corresponding replicas. - */ - static class ReplicaUnderConstruction extends Block { - private final DatanodeStorageInfo expectedLocation; - private ReplicaState state; - private boolean chosenAsPrimary; - - ReplicaUnderConstruction(Block block, - DatanodeStorageInfo target, - ReplicaState state) { - super(block); - this.expectedLocation = target; - this.state = state; - this.chosenAsPrimary = false; - } - - /** - * Expected block replica location as assigned when the block was allocated. - * This defines the pipeline order. - * It is not guaranteed, but expected, that the data-node actually has - * the replica. - */ - private DatanodeStorageInfo getExpectedStorageLocation() { - return expectedLocation; - } - - /** - * Get replica state as reported by the data-node. - */ - ReplicaState getState() { - return state; - } - - /** - * Whether the replica was chosen for recovery. - */ - boolean getChosenAsPrimary() { - return chosenAsPrimary; - } - - /** - * Set replica state. - */ - void setState(ReplicaState s) { - state = s; - } - - /** - * Set whether this replica was chosen for recovery. - */ - void setChosenAsPrimary(boolean chosenAsPrimary) { - this.chosenAsPrimary = chosenAsPrimary; - } - - /** - * Is data-node the replica belongs to alive. - */ - boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; - } - - @Override // Block - public int hashCode() { - return super.hashCode(); - } - - @Override // Block - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(50); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - sb.append("ReplicaUC[") - .append(expectedLocation) - .append("|") - .append(state) - .append("]"); - } - } - - /** * Create block and set its state to * {@link BlockUCState#UNDER_CONSTRUCTION}. */ @@ -169,7 +75,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { BlockUCState state, DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : - "BlockInfoUnderConstruction cannot be in COMPLETE state"; + "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state"; this.blockUCState = state; setExpectedLocations(targets); } @@ -183,34 +89,34 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { * the client or it does not have at least a minimal number of replicas * reported from data-nodes. */ - BlockInfo convertToCompleteBlock() throws IOException { + @Override + public BlockInfo convertToCompleteBlock() throws IOException { assert getBlockUCState() != BlockUCState.COMPLETE : "Trying to convert a COMPLETE block"; return new BlockInfoContiguous(this); } - /** Set expected locations */ + @Override public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations); - for(int i = 0; i < numLocations; i++) - replicas.add( - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); + this.replicas = new ArrayList<>(numLocations); + for(int i = 0; i < numLocations; i++) { + replicas.add(new ReplicaUnderConstruction(this, targets[i], + ReplicaState.RBW)); + } } - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ + @Override public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for(int i = 0; i < numLocations; i++) + for (int i = 0; i < numLocations; i++) { storages[i] = replicas.get(i).getExpectedStorageLocation(); + } return storages; } - /** Get the number of expected locations */ + @Override public int getNumExpectedLocations() { return replicas == null ? 0 : replicas.size(); } @@ -228,25 +134,26 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { blockUCState = s; } - /** Get block recovery ID */ + @Override public long getBlockRecoveryId() { return blockRecoveryId; } - /** Get recover block */ + @Override public Block getTruncateBlock() { return truncateBlock; } + @Override + public Block toBlock(){ + return this; + } + public void setTruncateBlock(Block recoveryBlock) { this.truncateBlock = recoveryBlock; } - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ + @Override public void setGenerationStampAndVerifyReplicas(long genStamp) { // Set the generation stamp for the block. setGenerationStamp(genStamp); @@ -264,13 +171,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { } } - /** - * Commit block's length and generation stamp as reported by the client. - * Set block state to {@link BlockUCState#COMMITTED}. - * @param block - contains client reported block length and generation - * @throws IOException if block ids are inconsistent. - */ - void commitBlock(Block block) throws IOException { + @Override + public void commitBlock(Block block) throws IOException { if(getBlockId() != block.getBlockId()) throw new IOException("Trying to commit inconsistent block: id = " + block.getBlockId() + ", expected id = " + getBlockId()); @@ -280,31 +182,27 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); } - /** - * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary and - * make it primary. - */ + @Override public void initializeBlockRecovery(long recoveryId) { setBlockUCState(BlockUCState.UNDER_RECOVERY); blockRecoveryId = recoveryId; if (replicas.size() == 0) { NameNode.blockStateChangeLog.warn("BLOCK*" - + " BlockInfoUnderConstruction.initLeaseRecovery:" + + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } boolean allLiveReplicasTriedAsPrimary = true; - for (int i = 0; i < replicas.size(); i++) { + for (ReplicaUnderConstruction replica : replicas) { // Check if all replicas have been tried or not. - if (replicas.get(i).isAlive()) { - allLiveReplicasTriedAsPrimary = - (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); } } if (allLiveReplicasTriedAsPrimary) { // Just set all the replicas to be chosen whether they are alive or not. - for (int i = 0; i < replicas.size(); i++) { - replicas.get(i).setChosenAsPrimary(false); + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); } } long mostRecentLastUpdate = 0; @@ -325,16 +223,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { } } if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.debug( "BLOCK* {} recovery started, primary={}", this, primary); } } - void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block block, - ReplicaState rState) { + @Override + public void addReplicaIfNotPresent(DatanodeStorageInfo storage, + Block block, ReplicaState rState) { Iterator<ReplicaUnderConstruction> it = replicas.iterator(); while (it.hasNext()) { ReplicaUnderConstruction r = it.next(); @@ -358,18 +257,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } - @Override // BlockInfo - // BlockInfoUnderConstruction participates in maps the same way as BlockInfo - public int hashCode() { - return super.hashCode(); - } - - @Override // BlockInfo - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - @Override public String toString() { final StringBuilder b = new StringBuilder(100);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java new file mode 100644 index 0000000..6674510 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final ECSchema schema; + private final int cellSize; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, ECSchema schema, int cellSize) { + super(blk, (short) (schema.getNumDataUnits() + schema.getNumParityUnits())); + indices = new byte[schema.getNumDataUnits() + schema.getNumParityUnits()]; + initIndices(); + this.schema = schema; + this.cellSize = cellSize; + } + + BlockInfoStriped(BlockInfoStriped b) { + this(b, b.getSchema(), b.getCellSize()); + this.setBlockCollection(b.getBlockCollection()); + } + + public short getTotalBlockNum() { + return (short) (this.schema.getNumDataUnits() + + this.schema.getNumParityUnits()); + } + + public short getDataBlockNum() { + return (short) this.schema.getNumDataUnits(); + } + + public short getParityBlockNum() { + return (short) this.schema.getNumParityUnits(); + } + + /** + * If the block is committed/completed and its length is less than a full + * stripe, it returns the the number of actual data blocks. + * Otherwise it returns the number of data units specified by schema. + */ + public short getRealDataBlockNum() { + if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) { + return (short) Math.min(getDataBlockNum(), + (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1); + } else { + return getDataBlockNum(); + } + } + + public short getRealTotalBlockNum() { + return (short) (getRealDataBlockNum() + getParityBlockNum()); + } + + public ECSchema getSchema() { + return schema; + } + + public int getCellSize() { + return cellSize; + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + int getStorageBlockIndex(DatanodeStorageInfo storage) { + int i = this.findStorageInfo(storage); + return i == -1 ? -1 : indices[i]; + } + + /** + * Identify the block stored in the given datanode storage. Note that + * the returned block has the same block Id with the one seen/reported by the + * DataNode. + */ + Block getBlockOnStorage(DatanodeStorageInfo storage) { + int index = getStorageBlockIndex(storage); + if (index < 0) { + return null; + } else { + Block block = new Block(this); + block.setBlockId(this.getBlockId() + index); + return block; + } + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoStriped; + BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; + final int size = getCapacity(); + newBlockGroup.ensureCapacity(size, false); + for (int i = 0; i < size; i++) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + if (storage != null) { + final int blockIndex = indices[i]; + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + newBlockGroup.addStorage(storage, i, blockIndex); + storage.insertToList(newBlockGroup); + } + } + } + + public long spaceConsumed() { + // In case striped blocks, total usage by this striped blocks should + // be the total of data blocks and parity blocks because + // `getNumBytes` is the total of actual data block size. + return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(), + this.schema.getNumDataUnits(), this.schema.getNumParityUnits(), + BLOCK_STRIPED_CELL_SIZE); + } + + @Override + public final boolean isStriped() { + return true; + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } + + /** + * Convert a complete block to an under construction block. + * @return BlockInfoUnderConstruction - an under construction block. + */ + public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction( + BlockUCState s, DatanodeStorageInfo[] targets) { + final BlockInfoStripedUnderConstruction ucBlock; + if(isComplete()) { + ucBlock = new BlockInfoStripedUnderConstruction(this, schema, cellSize, + s, targets); + ucBlock.setBlockCollection(getBlockCollection()); + } else { + // the block is already under construction + ucBlock = (BlockInfoStripedUnderConstruction) this; + ucBlock.setBlockUCState(s); + ucBlock.setExpectedLocations(targets); + ucBlock.setBlockCollection(getBlockCollection()); + } + return ucBlock; + } + + @Override + final boolean hasNoStorage() { + final int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + if (getStorageInfo(idx) != null) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java new file mode 100644 index 0000000..5f78096 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; + +/** + * Represents a striped block that is currently being constructed. + * This is usually the last block of a file opened for write or append. + */ +public class BlockInfoStripedUnderConstruction extends BlockInfoStriped + implements BlockInfoUnderConstruction{ + private BlockUCState blockUCState; + + /** + * Block replicas as assigned when the block was allocated. + */ + private ReplicaUnderConstruction[] replicas; + + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ + private int primaryNodeIndex = -1; + + /** + * The new generation stamp, which this block will have + * after the recovery succeeds. Also used as a recovery id to identify + * the right recovery if any of the abandoned recoveries re-appear. + */ + private long blockRecoveryId = 0; + + /** + * Constructor with null storage targets. + */ + public BlockInfoStripedUnderConstruction(Block blk, ECSchema schema, + int cellSize) { + this(blk, schema, cellSize, UNDER_CONSTRUCTION, null); + } + + /** + * Create a striped block that is currently being constructed. + */ + public BlockInfoStripedUnderConstruction(Block blk, ECSchema schema, + int cellSize, BlockUCState state, DatanodeStorageInfo[] targets) { + super(blk, schema, cellSize); + assert getBlockUCState() != COMPLETE : + "BlockInfoStripedUnderConstruction cannot be in COMPLETE state"; + this.blockUCState = state; + setExpectedLocations(targets); + } + + @Override + public BlockInfoStriped convertToCompleteBlock() throws IOException { + assert getBlockUCState() != COMPLETE : + "Trying to convert a COMPLETE block"; + return new BlockInfoStriped(this); + } + + /** Set expected locations */ + @Override + public void setExpectedLocations(DatanodeStorageInfo[] targets) { + int numLocations = targets == null ? 0 : targets.length; + this.replicas = new ReplicaUnderConstruction[numLocations]; + for(int i = 0; i < numLocations; i++) { + // when creating a new block we simply sequentially assign block index to + // each storage + Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp()); + replicas[i] = new ReplicaUnderConstruction(blk, targets[i], + ReplicaState.RBW); + } + } + + /** + * Create array of expected replica locations + * (as has been assigned by chooseTargets()). + */ + @Override + public DatanodeStorageInfo[] getExpectedStorageLocations() { + int numLocations = getNumExpectedLocations(); + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; + for (int i = 0; i < numLocations; i++) { + storages[i] = replicas[i].getExpectedStorageLocation(); + } + return storages; + } + + /** @return the index array indicating the block index in each storage */ + public int[] getBlockIndices() { + int numLocations = getNumExpectedLocations(); + int[] indices = new int[numLocations]; + for (int i = 0; i < numLocations; i++) { + indices[i] = BlockIdManager.getBlockIndex(replicas[i]); + } + return indices; + } + + @Override + public int getNumExpectedLocations() { + return replicas == null ? 0 : replicas.length; + } + + /** + * Return the state of the block under construction. + * @see BlockUCState + */ + @Override // BlockInfo + public BlockUCState getBlockUCState() { + return blockUCState; + } + + void setBlockUCState(BlockUCState s) { + blockUCState = s; + } + + @Override + public long getBlockRecoveryId() { + return blockRecoveryId; + } + + @Override + public Block getTruncateBlock() { + return null; + } + + @Override + public Block toBlock(){ + return this; + } + + @Override + public void setGenerationStampAndVerifyReplicas(long genStamp) { + // Set the generation stamp for the block. + setGenerationStamp(genStamp); + if (replicas == null) + return; + + // Remove the replicas with wrong gen stamp. + // The replica list is unchanged. + for (ReplicaUnderConstruction r : replicas) { + if (genStamp != r.getGenerationStamp()) { + r.getExpectedStorageLocation().removeBlock(this); + NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " + + "from location: {}", r.getExpectedStorageLocation()); + } + } + } + + @Override + public void commitBlock(Block block) throws IOException { + if (getBlockId() != block.getBlockId()) { + throw new IOException("Trying to commit inconsistent block: id = " + + block.getBlockId() + ", expected id = " + getBlockId()); + } + blockUCState = BlockUCState.COMMITTED; + this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); + // Sort out invalid replicas. + setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); + } + + @Override + public void initializeBlockRecovery(long recoveryId) { + setBlockUCState(BlockUCState.UNDER_RECOVERY); + blockRecoveryId = recoveryId; + if (replicas == null || replicas.length == 0) { + NameNode.blockStateChangeLog.warn("BLOCK*" + + " BlockInfoStripedUnderConstruction.initLeaseRecovery:" + + " No blocks found, lease removed."); + // sets primary node index and return. + primaryNodeIndex = -1; + return; + } + boolean allLiveReplicasTriedAsPrimary = true; + for (ReplicaUnderConstruction replica : replicas) { + // Check if all replicas have been tried or not. + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); + } + } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.length; i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) { + continue; + } + final ReplicaUnderConstruction ruc = replicas[i]; + final long lastUpdate = ruc.getExpectedStorageLocation() + .getDatanodeDescriptor().getLastUpdateMonotonic(); + if (lastUpdate > mostRecentLastUpdate) { + primaryNodeIndex = i; + primary = ruc; + mostRecentLastUpdate = lastUpdate; + } + } + if (primary != null) { + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(this); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.info( + "BLOCK* {} recovery started, primary={}", this, primary); + } + } + + @Override + public void addReplicaIfNotPresent(DatanodeStorageInfo storage, + Block reportedBlock, ReplicaState rState) { + if (replicas == null) { + replicas = new ReplicaUnderConstruction[1]; + replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState); + } else { + for (int i = 0; i < replicas.length; i++) { + DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation(); + if (expected == storage) { + replicas[i].setBlockId(reportedBlock.getBlockId()); + replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp()); + return; + } else if (expected != null && expected.getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage, + rState); + return; + } + } + ReplicaUnderConstruction[] newReplicas = + new ReplicaUnderConstruction[replicas.length + 1]; + System.arraycopy(replicas, 0, newReplicas, 0, replicas.length); + newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction( + reportedBlock, storage, rState); + replicas = newReplicas; + } + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(100); + appendStringTo(b); + return b.toString(); + } + + @Override + public void appendStringTo(StringBuilder sb) { + super.appendStringTo(sb); + appendUCParts(sb); + } + + private void appendUCParts(StringBuilder sb) { + sb.append("{UCState=").append(blockUCState). + append(", primaryNodeIndex=").append(primaryNodeIndex). + append(", replicas=["); + if (replicas != null) { + int i = 0; + for (ReplicaUnderConstruction r : replicas) { + r.appendStringTo(sb); + if (++i < replicas.length) { + sb.append(", "); + } + } + } + sb.append("]}"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java new file mode 100644 index 0000000..10a8cae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + +public interface BlockInfoUnderConstruction { + /** + * Create array of expected replica locations + * (as has been assigned by chooseTargets()). + */ + public DatanodeStorageInfo[] getExpectedStorageLocations(); + + /** Get recover block */ + public Block getTruncateBlock(); + + /** Convert to a Block object */ + public Block toBlock(); + + /** Get block recovery ID */ + public long getBlockRecoveryId(); + + /** Get the number of expected locations */ + public int getNumExpectedLocations(); + + /** Set expected locations */ + public void setExpectedLocations(DatanodeStorageInfo[] targets); + + /** + * Process the recorded replicas. When about to commit or finish the + * pipeline recovery sort out bad replicas. + * @param genStamp The final generation stamp for the block. + */ + public void setGenerationStampAndVerifyReplicas(long genStamp); + + /** + * Initialize lease recovery for this block. + * Find the first alive data-node starting from the previous primary and + * make it primary. + */ + public void initializeBlockRecovery(long recoveryId); + + /** Add the reported replica if it is not already in the replica list. */ + public void addReplicaIfNotPresent(DatanodeStorageInfo storage, + Block reportedBlock, ReplicaState rState); + + /** + * Commit block's length and generation stamp as reported by the client. + * Set block state to {@link BlockUCState#COMMITTED}. + * @param block - contains client reported block length and generation + * @throws IOException if block ids are inconsistent. + */ + public void commitBlock(Block block) throws IOException; + + /** + * Convert an under construction block to a complete block. + * + * @return a complete block. + * @throws IOException + * if the state of the block (the generation stamp and the length) + * has not been committed by the client or it does not have at least + * a minimal number of replicas reported from data-nodes. + */ + public BlockInfo convertToCompleteBlock() throws IOException; +}