HDFS-8909. Erasure coding: update BlockInfoContiguousUC and BlockInfoStripedUC to use BlockUnderConstructionFeature. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/164cbe64 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/164cbe64 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/164cbe64 Branch: refs/heads/trunk Commit: 164cbe643988f878f0f4100a4de51783e5b6738e Parents: 067ec8c Author: Walter Su <[email protected]> Authored: Thu Aug 27 16:02:30 2015 +0800 Committer: Walter Su <[email protected]> Committed: Thu Aug 27 16:02:30 2015 +0800 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hdfs/server/blockmanagement/BlockInfo.java | 105 +++++-- .../blockmanagement/BlockInfoContiguous.java | 23 -- .../BlockInfoContiguousUnderConstruction.java | 281 ------------------ .../blockmanagement/BlockInfoStriped.java | 21 -- .../BlockInfoStripedUnderConstruction.java | 297 ------------------- .../BlockInfoUnderConstruction.java | 84 ------ .../server/blockmanagement/BlockManager.java | 96 +++--- .../BlockUnderConstructionFeature.java | 269 +++++++++++++++++ .../blockmanagement/DatanodeDescriptor.java | 10 +- .../server/blockmanagement/DatanodeManager.java | 24 +- .../hdfs/server/namenode/FSDirTruncateOp.java | 42 +-- .../hdfs/server/namenode/FSDirWriteFileOp.java | 31 +- .../hdfs/server/namenode/FSEditLogLoader.java | 15 +- .../hdfs/server/namenode/FSImageFormat.java | 9 +- .../server/namenode/FSImageFormatPBINode.java | 9 +- .../server/namenode/FSImageSerialization.java | 6 +- .../hdfs/server/namenode/FSNamesystem.java | 40 ++- .../hadoop/hdfs/server/namenode/INodeFile.java | 33 +-- .../server/namenode/snapshot/FileDiffList.java | 3 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 9 +- .../TestBlockInfoUnderConstruction.java | 17 +- .../blockmanagement/TestBlockManager.java | 9 +- .../blockmanagement/TestHeartbeatHandling.java | 21 +- .../blockmanagement/TestReplicationPolicy.java | 12 +- .../server/namenode/TestAddStripedBlocks.java | 38 ++- .../namenode/TestBlockUnderConstruction.java | 6 +- .../TestCommitBlockSynchronization.java | 9 +- .../hdfs/server/namenode/TestFileTruncate.java | 5 +- .../server/namenode/TestStripedINodeFile.java | 11 +- .../namenode/ha/TestRetryCacheWithHA.java | 11 +- .../namenode/snapshot/SnapshotTestHelper.java | 6 +- 32 files changed, 577 insertions(+), 978 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 8b25e68..28cc34a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -403,3 +403,6 @@ HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream when the data length is small. (szetszwo via waltersu4549) + + HDFS-8909. Erasure coding: update BlockInfoContiguousUC and BlockInfoStripedUC + to use BlockUnderConstructionFeature. (Jing Zhao via waltersu4549) http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index bf11914..f440e14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.io.IOException; import java.util.LinkedList; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -52,6 +54,8 @@ public abstract class BlockInfo extends Block */ protected Object[] triplets; + private BlockUnderConstructionFeature uc; + /** * Construct an entry for blocksmap * @param size the block's replication factor, or the total number of blocks @@ -287,26 +291,6 @@ public abstract class BlockInfo extends Block return this; } - /** - * BlockInfo represents a block that is not being constructed. - * In order to start modifying the block, the BlockInfo should be converted to - * {@link BlockInfoContiguousUnderConstruction} or - * {@link BlockInfoStripedUnderConstruction}. - * @return {@link BlockUCState#COMPLETE} - */ - public BlockUCState getBlockUCState() { - return BlockUCState.COMPLETE; - } - - /** - * Is this block complete? - * - * @return true if the state of the block is {@link BlockUCState#COMPLETE} - */ - public boolean isComplete() { - return getBlockUCState().equals(BlockUCState.COMPLETE); - } - public boolean isDeleted() { return (bc == null); } @@ -332,4 +316,85 @@ public abstract class BlockInfo extends Block public void setNext(LightWeightGSet.LinkedElement next) { this.nextLinkedElement = next; } + + /* UnderConstruction Feature related */ + + public BlockUnderConstructionFeature getUnderConstructionFeature() { + return uc; + } + + public BlockUCState getBlockUCState() { + return uc == null ? BlockUCState.COMPLETE : uc.getBlockUCState(); + } + + /** + * Is this block complete? + * + * @return true if the state of the block is {@link BlockUCState#COMPLETE} + */ + public boolean isComplete() { + return getBlockUCState().equals(BlockUCState.COMPLETE); + } + + /** + * Add/Update the under construction feature. + */ + public void convertToBlockUnderConstruction(BlockUCState s, + DatanodeStorageInfo[] targets) { + if (isComplete()) { + uc = new BlockUnderConstructionFeature(this, s, targets, this.isStriped()); + } else { + // the block is already under construction + uc.setBlockUCState(s); + uc.setExpectedLocations(this, targets, this.isStriped()); + } + } + + /** + * Convert an under construction block to a complete block. + * + * @return BlockInfo - a complete block. + * @throws IOException if the state of the block + * (the generation stamp and the length) has not been committed by + * the client or it does not have at least a minimal number of replicas + * reported from data-nodes. + */ + BlockInfo convertToCompleteBlock() throws IOException { + assert getBlockUCState() != BlockUCState.COMPLETE : + "Trying to convert a COMPLETE block"; + uc = null; + return this; + } + + /** + * Process the recorded replicas. When about to commit or finish the + * pipeline recovery sort out bad replicas. + * @param genStamp The final generation stamp for the block. + */ + public void setGenerationStampAndVerifyReplicas(long genStamp) { + Preconditions.checkState(uc != null && !isComplete()); + // Set the generation stamp for the block. + setGenerationStamp(genStamp); + + // Remove the replicas with wrong gen stamp + uc.removeStaleReplicas(this); + } + + /** + * Commit block's length and generation stamp as reported by the client. + * Set block state to {@link BlockUCState#COMMITTED}. + * @param block - contains client reported block length and generation + * @throws IOException if block ids are inconsistent. + */ + void commitBlock(Block block) throws IOException { + if (getBlockId() != block.getBlockId()) { + throw new IOException("Trying to commit inconsistent block: id = " + + block.getBlockId() + ", expected id = " + getBlockId()); + } + Preconditions.checkState(!isComplete()); + uc.commit(); + this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); + // Sort out invalid replicas. + setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index bb9bf5b..12b4fd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; /** * Subclass of {@link BlockInfo}, used for a block with replication scheme. @@ -123,28 +122,6 @@ public class BlockInfoContiguous extends BlockInfo { } } - /** - * Convert a complete block to an under construction block. - * @return BlockInfoUnderConstruction - an under construction block. - */ - public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeStorageInfo[] targets) { - if(isComplete()) { - BlockInfoContiguousUnderConstruction ucBlock = - new BlockInfoContiguousUnderConstruction(this, - getBlockCollection().getPreferredBlockReplication(), s, targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - // the block is already under construction - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction) this; - ucBlock.setBlockUCState(s); - ucBlock.setExpectedLocations(targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - @Override public final boolean isStriped() { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java deleted file mode 100644 index 96b209d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.namenode.NameNode; - -/** - * Represents a block that is currently being constructed.<br> - * This is usually the last block of a file opened for write or append. - */ -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous - implements BlockInfoUnderConstruction{ - /** Block state. See {@link BlockUCState} */ - private BlockUCState blockUCState; - - /** - * Block replicas as assigned when the block was allocated. - * This defines the pipeline order. - */ - private List<ReplicaUnderConstruction> replicas; - - /** - * Index of the primary data node doing the recovery. Useful for log - * messages. - */ - private int primaryNodeIndex = -1; - - /** - * The new generation stamp, which this block will have - * after the recovery succeeds. Also used as a recovery id to identify - * the right recovery if any of the abandoned recoveries re-appear. - */ - private long blockRecoveryId = 0; - - /** - * The block source to use in the event of copy-on-write truncate. - */ - private Block truncateBlock; - - /** - * Create block and set its state to - * {@link BlockUCState#UNDER_CONSTRUCTION}. - */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication) { - this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null); - } - - /** - * Create a block that is currently being constructed. - */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication, - BlockUCState state, DatanodeStorageInfo[] targets) { - super(blk, replication); - assert getBlockUCState() != BlockUCState.COMPLETE : - "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state"; - this.blockUCState = state; - setExpectedLocations(targets); - } - - @Override - public BlockInfoContiguous convertToCompleteBlock() throws IOException { - assert getBlockUCState() != BlockUCState.COMPLETE : - "Trying to convert a COMPLETE block"; - return new BlockInfoContiguous(this); - } - - @Override - public void setExpectedLocations(DatanodeStorageInfo[] targets) { - int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList<>(numLocations); - for(int i = 0; i < numLocations; i++) { - replicas.add(new ReplicaUnderConstruction(this, targets[i], - ReplicaState.RBW)); - } - } - - @Override - public DatanodeStorageInfo[] getExpectedStorageLocations() { - int numLocations = replicas == null ? 0 : replicas.size(); - DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for (int i = 0; i < numLocations; i++) { - storages[i] = replicas.get(i).getExpectedStorageLocation(); - } - return storages; - } - - @Override - public int getNumExpectedLocations() { - return replicas == null ? 0 : replicas.size(); - } - - /** - * Return the state of the block under construction. - * @see BlockUCState - */ - @Override // BlockInfo - public BlockUCState getBlockUCState() { - return blockUCState; - } - - void setBlockUCState(BlockUCState s) { - blockUCState = s; - } - - @Override - public long getBlockRecoveryId() { - return blockRecoveryId; - } - - @Override - public Block getTruncateBlock() { - return truncateBlock; - } - - @Override - public Block toBlock(){ - return this; - } - - public void setTruncateBlock(Block recoveryBlock) { - this.truncateBlock = recoveryBlock; - } - - @Override - public void setGenerationStampAndVerifyReplicas(long genStamp) { - // Set the generation stamp for the block. - setGenerationStamp(genStamp); - if (replicas == null) - return; - - // Remove the replicas with wrong gen stamp. - // The replica list is unchanged. - for (ReplicaUnderConstruction r : replicas) { - if (genStamp != r.getGenerationStamp()) { - r.getExpectedStorageLocation().removeBlock(this); - NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica " - + "from location: {}", r.getExpectedStorageLocation()); - } - } - } - - @Override - public void commitBlock(Block block) throws IOException { - if(getBlockId() != block.getBlockId()) - throw new IOException("Trying to commit inconsistent block: id = " - + block.getBlockId() + ", expected id = " + getBlockId()); - blockUCState = BlockUCState.COMMITTED; - this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); - // Sort out invalid replicas. - setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); - } - - @Override - public void initializeBlockRecovery(long recoveryId) { - setBlockUCState(BlockUCState.UNDER_RECOVERY); - blockRecoveryId = recoveryId; - if (replicas.size() == 0) { - NameNode.blockStateChangeLog.warn("BLOCK*" - + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:" - + " No blocks found, lease removed."); - } - boolean allLiveReplicasTriedAsPrimary = true; - for (ReplicaUnderConstruction replica : replicas) { - // Check if all replicas have been tried or not. - if (replica.isAlive()) { - allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && - replica.getChosenAsPrimary()); - } - } - if (allLiveReplicasTriedAsPrimary) { - // Just set all the replicas to be chosen whether they are alive or not. - for (ReplicaUnderConstruction replica : replicas) { - replica.setChosenAsPrimary(false); - } - } - long mostRecentLastUpdate = 0; - ReplicaUnderConstruction primary = null; - primaryNodeIndex = -1; - for(int i = 0; i < replicas.size(); i++) { - // Skip alive replicas which have been chosen for recovery. - if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { - continue; - } - final ReplicaUnderConstruction ruc = replicas.get(i); - final long lastUpdate = ruc.getExpectedStorageLocation() - .getDatanodeDescriptor().getLastUpdateMonotonic(); - if (lastUpdate > mostRecentLastUpdate) { - primaryNodeIndex = i; - primary = ruc; - mostRecentLastUpdate = lastUpdate; - } - } - if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor() - .addBlockToBeRecovered(this); - primary.setChosenAsPrimary(true); - NameNode.blockStateChangeLog.debug( - "BLOCK* {} recovery started, primary={}", this, primary); - } - } - - @Override - public void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block block, ReplicaState rState) { - Iterator<ReplicaUnderConstruction> it = replicas.iterator(); - while (it.hasNext()) { - ReplicaUnderConstruction r = it.next(); - DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation(); - if(expectedLocation == storage) { - // Record the gen stamp from the report - r.setGenerationStamp(block.getGenerationStamp()); - return; - } else if (expectedLocation != null && - expectedLocation.getDatanodeDescriptor() == - storage.getDatanodeDescriptor()) { - - // The Datanode reported that the block is on a different storage - // than the one chosen by BlockPlacementPolicy. This can occur as - // we allow Datanodes to choose the target storage. Update our - // state by removing the stale entry and adding a new one. - it.remove(); - break; - } - } - replicas.add(new ReplicaUnderConstruction(block, storage, rState)); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(100); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - super.appendStringTo(sb); - appendUCParts(sb); - } - - private void appendUCParts(StringBuilder sb) { - sb.append("{UCState=").append(blockUCState) - .append(", truncateBlock=" + truncateBlock) - .append(", primaryNodeIndex=").append(primaryNodeIndex) - .append(", replicas=["); - if (replicas != null) { - Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); - if (iter.hasNext()) { - iter.next().appendStringTo(sb); - while (iter.hasNext()) { - sb.append(", "); - iter.next().appendStringTo(sb); - } - } - } - sb.append("]}"); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 14d2fcc..6093776 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -245,27 +245,6 @@ public class BlockInfoStriped extends BlockInfo { return num; } - /** - * Convert a complete block to an under construction block. - * @return BlockInfoUnderConstruction - an under construction block. - */ - public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeStorageInfo[] targets) { - final BlockInfoStripedUnderConstruction ucBlock; - if(isComplete()) { - ucBlock = new BlockInfoStripedUnderConstruction(this, ecPolicy, - s, targets); - ucBlock.setBlockCollection(getBlockCollection()); - } else { - // the block is already under construction - ucBlock = (BlockInfoStripedUnderConstruction) this; - ucBlock.setBlockUCState(s); - ucBlock.setExpectedLocations(targets); - ucBlock.setBlockCollection(getBlockCollection()); - } - return ucBlock; - } - @Override final boolean hasNoStorage() { final int len = getCapacity(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java deleted file mode 100644 index 9de8294..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; - -import java.io.IOException; - -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE; -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; - -/** - * Represents a striped block that is currently being constructed. - * This is usually the last block of a file opened for write or append. - */ -public class BlockInfoStripedUnderConstruction extends BlockInfoStriped - implements BlockInfoUnderConstruction{ - private BlockUCState blockUCState; - - /** - * Block replicas as assigned when the block was allocated. - */ - private ReplicaUnderConstruction[] replicas; - - /** - * Index of the primary data node doing the recovery. Useful for log - * messages. - */ - private int primaryNodeIndex = -1; - - /** - * The new generation stamp, which this block will have - * after the recovery succeeds. Also used as a recovery id to identify - * the right recovery if any of the abandoned recoveries re-appear. - */ - private long blockRecoveryId = 0; - - /** - * Constructor with null storage targets. - */ - public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy) { - this(blk, ecPolicy, UNDER_CONSTRUCTION, null); - } - - /** - * Create a striped block that is currently being constructed. - */ - public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy, - BlockUCState state, DatanodeStorageInfo[] targets) { - super(blk, ecPolicy); - assert getBlockUCState() != COMPLETE : - "BlockInfoStripedUnderConstruction cannot be in COMPLETE state"; - this.blockUCState = state; - setExpectedLocations(targets); - } - - @Override - public BlockInfoStriped convertToCompleteBlock() throws IOException { - assert getBlockUCState() != COMPLETE : - "Trying to convert a COMPLETE block"; - return new BlockInfoStriped(this); - } - - /** Set expected locations */ - @Override - public void setExpectedLocations(DatanodeStorageInfo[] targets) { - int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ReplicaUnderConstruction[numLocations]; - for(int i = 0; i < numLocations; i++) { - // when creating a new block we simply sequentially assign block index to - // each storage - Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp()); - replicas[i] = new ReplicaUnderConstruction(blk, targets[i], - ReplicaState.RBW); - } - } - - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ - @Override - public DatanodeStorageInfo[] getExpectedStorageLocations() { - int numLocations = getNumExpectedLocations(); - DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for (int i = 0; i < numLocations; i++) { - storages[i] = replicas[i].getExpectedStorageLocation(); - } - return storages; - } - - /** @return the index array indicating the block index in each storage */ - public int[] getBlockIndices() { - int numLocations = getNumExpectedLocations(); - int[] indices = new int[numLocations]; - for (int i = 0; i < numLocations; i++) { - indices[i] = BlockIdManager.getBlockIndex(replicas[i]); - } - return indices; - } - - @Override - public int getNumExpectedLocations() { - return replicas == null ? 0 : replicas.length; - } - - /** - * Return the state of the block under construction. - * @see BlockUCState - */ - @Override // BlockInfo - public BlockUCState getBlockUCState() { - return blockUCState; - } - - void setBlockUCState(BlockUCState s) { - blockUCState = s; - } - - @Override - public long getBlockRecoveryId() { - return blockRecoveryId; - } - - @Override - public Block getTruncateBlock() { - return null; - } - - @Override - public Block toBlock(){ - return this; - } - - @Override - public void setGenerationStampAndVerifyReplicas(long genStamp) { - // Set the generation stamp for the block. - setGenerationStamp(genStamp); - if (replicas == null) - return; - - // Remove the replicas with wrong gen stamp. - // The replica list is unchanged. - for (ReplicaUnderConstruction r : replicas) { - if (genStamp != r.getGenerationStamp()) { - r.getExpectedStorageLocation().removeBlock(this); - NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " - + "from location: {}", r.getExpectedStorageLocation()); - } - } - } - - @Override - public void commitBlock(Block block) throws IOException { - if (getBlockId() != block.getBlockId()) { - throw new IOException("Trying to commit inconsistent block: id = " - + block.getBlockId() + ", expected id = " + getBlockId()); - } - blockUCState = BlockUCState.COMMITTED; - this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); - // Sort out invalid replicas. - setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); - } - - @Override - public void initializeBlockRecovery(long recoveryId) { - setBlockUCState(BlockUCState.UNDER_RECOVERY); - blockRecoveryId = recoveryId; - if (replicas == null || replicas.length == 0) { - NameNode.blockStateChangeLog.warn("BLOCK*" + - " BlockInfoStripedUnderConstruction.initLeaseRecovery:" + - " No blocks found, lease removed."); - // sets primary node index and return. - primaryNodeIndex = -1; - return; - } - boolean allLiveReplicasTriedAsPrimary = true; - for (ReplicaUnderConstruction replica : replicas) { - // Check if all replicas have been tried or not. - if (replica.isAlive()) { - allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && - replica.getChosenAsPrimary()); - } - } - if (allLiveReplicasTriedAsPrimary) { - // Just set all the replicas to be chosen whether they are alive or not. - for (ReplicaUnderConstruction replica : replicas) { - replica.setChosenAsPrimary(false); - } - } - long mostRecentLastUpdate = 0; - ReplicaUnderConstruction primary = null; - primaryNodeIndex = -1; - for(int i = 0; i < replicas.length; i++) { - // Skip alive replicas which have been chosen for recovery. - if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) { - continue; - } - final ReplicaUnderConstruction ruc = replicas[i]; - final long lastUpdate = ruc.getExpectedStorageLocation() - .getDatanodeDescriptor().getLastUpdateMonotonic(); - if (lastUpdate > mostRecentLastUpdate) { - primaryNodeIndex = i; - primary = ruc; - mostRecentLastUpdate = lastUpdate; - } - } - if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor() - .addBlockToBeRecovered(this); - primary.setChosenAsPrimary(true); - NameNode.blockStateChangeLog.info( - "BLOCK* {} recovery started, primary={}", this, primary); - } - } - - @Override - public void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block reportedBlock, ReplicaState rState) { - if (replicas == null) { - replicas = new ReplicaUnderConstruction[1]; - replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState); - } else { - for (int i = 0; i < replicas.length; i++) { - DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation(); - if (expected == storage) { - replicas[i].setBlockId(reportedBlock.getBlockId()); - replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp()); - return; - } else if (expected != null && expected.getDatanodeDescriptor() == - storage.getDatanodeDescriptor()) { - // The Datanode reported that the block is on a different storage - // than the one chosen by BlockPlacementPolicy. This can occur as - // we allow Datanodes to choose the target storage. Update our - // state by removing the stale entry and adding a new one. - replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage, - rState); - return; - } - } - ReplicaUnderConstruction[] newReplicas = - new ReplicaUnderConstruction[replicas.length + 1]; - System.arraycopy(replicas, 0, newReplicas, 0, replicas.length); - newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction( - reportedBlock, storage, rState); - replicas = newReplicas; - } - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(100); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - super.appendStringTo(sb); - appendUCParts(sb); - } - - private void appendUCParts(StringBuilder sb) { - sb.append("{UCState=").append(blockUCState). - append(", primaryNodeIndex=").append(primaryNodeIndex). - append(", replicas=["); - if (replicas != null) { - int i = 0; - for (ReplicaUnderConstruction r : replicas) { - r.appendStringTo(sb); - if (++i < replicas.length) { - sb.append(", "); - } - } - } - sb.append("]}"); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java deleted file mode 100644 index 10a8cae..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.io.IOException; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; - -public interface BlockInfoUnderConstruction { - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ - public DatanodeStorageInfo[] getExpectedStorageLocations(); - - /** Get recover block */ - public Block getTruncateBlock(); - - /** Convert to a Block object */ - public Block toBlock(); - - /** Get block recovery ID */ - public long getBlockRecoveryId(); - - /** Get the number of expected locations */ - public int getNumExpectedLocations(); - - /** Set expected locations */ - public void setExpectedLocations(DatanodeStorageInfo[] targets); - - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ - public void setGenerationStampAndVerifyReplicas(long genStamp); - - /** - * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary and - * make it primary. - */ - public void initializeBlockRecovery(long recoveryId); - - /** Add the reported replica if it is not already in the replica list. */ - public void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block reportedBlock, ReplicaState rState); - - /** - * Commit block's length and generation stamp as reported by the client. - * Set block state to {@link BlockUCState#COMMITTED}. - * @param block - contains client reported block length and generation - * @throws IOException if block ids are inconsistent. - */ - public void commitBlock(Block block) throws IOException; - - /** - * Convert an under construction block to a complete block. - * - * @return a complete block. - * @throws IOException - * if the state of the block (the generation stamp and the length) - * has not been committed by the client or it does not have at least - * a minimal number of replicas reported from data-nodes. - */ - public BlockInfo convertToCompleteBlock() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index dfea5f3..ae08825 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -644,19 +644,13 @@ public class BlockManager implements BlockStatsMXBean { */ private static boolean commitBlock(final BlockInfo block, final Block commitBlock) throws IOException { - if (block instanceof BlockInfoUnderConstruction - && block.getBlockUCState() != BlockUCState.COMMITTED) { - final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block; - - assert block.getNumBytes() <= commitBlock.getNumBytes() : + if (block.getBlockUCState() == BlockUCState.COMMITTED) + return false; + assert block.getNumBytes() <= commitBlock.getNumBytes() : "commitBlock length is less than the stored one " - + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); - - uc.commitBlock(commitBlock); - return true; - } - - return false; + + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); + block.commitBlock(commitBlock); + return true; } /** @@ -713,9 +707,7 @@ public class BlockManager implements BlockStatsMXBean { "Cannot complete block: block has not been COMMITTED by the client"); } - final BlockInfo completeBlock - = !(curBlock instanceof BlockInfoUnderConstruction)? curBlock - : ((BlockInfoUnderConstruction)curBlock).convertToCompleteBlock(); + final BlockInfo completeBlock = curBlock.convertToCompleteBlock(); // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@ -754,9 +746,7 @@ public class BlockManager implements BlockStatsMXBean { */ public BlockInfo forceCompleteBlock(final BlockCollection bc, final BlockInfo block) throws IOException { - if (block instanceof BlockInfoUnderConstruction) { - ((BlockInfoUnderConstruction)block).commitBlock(block); - } + block.commitBlock(block); return completeBlock(bc, block, true); } @@ -777,30 +767,28 @@ public class BlockManager implements BlockStatsMXBean { */ public LocatedBlock convertLastBlockToUnderConstruction( BlockCollection bc, long bytesToRemove) throws IOException { - BlockInfo oldBlock = bc.getLastBlock(); - if(oldBlock == null || - bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove) + BlockInfo lastBlock = bc.getLastBlock(); + if(lastBlock == null || + bc.getPreferredBlockSize() == lastBlock.getNumBytes() - bytesToRemove) return null; - assert oldBlock == getStoredBlock(oldBlock) : + assert lastBlock == getStoredBlock(lastBlock) : "last block of the file is not in blocksMap"; - DatanodeStorageInfo[] targets = getStorages(oldBlock); + DatanodeStorageInfo[] targets = getStorages(lastBlock); - // convert the last block to UC - bc.convertLastBlockToUC(oldBlock, targets); - // get the new created uc block - BlockInfo ucBlock = bc.getLastBlock(); - blocksMap.replaceBlock(ucBlock); + // convert the last block to under construction. note no block replacement + // is happening + bc.convertLastBlockToUC(lastBlock, targets); // Remove block from replication queue. - NumberReplicas replicas = countNodes(ucBlock); - neededReplications.remove(ucBlock, replicas.liveReplicas(), - replicas.decommissionedAndDecommissioning(), getReplication(ucBlock)); - pendingReplications.remove(ucBlock); + NumberReplicas replicas = countNodes(lastBlock); + neededReplications.remove(lastBlock, replicas.liveReplicas(), + replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); + pendingReplications.remove(lastBlock); // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - final Block b = getBlockOnStorage(oldBlock, storage); + final Block b = getBlockOnStorage(lastBlock, storage); if (b != null) { invalidateBlocks.remove(storage.getDatanodeDescriptor(), b); } @@ -810,13 +798,15 @@ public class BlockManager implements BlockStatsMXBean { // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - hasMinStorage(oldBlock, targets.length) ? -1 : 0, + hasMinStorage(lastBlock, targets.length) ? -1 : 0, // always decrement total blocks -1); - final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength(); - final long pos = fileLength - ucBlock.getNumBytes(); - return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE); + final long fileLength = bc.computeContentSummary( + getStoragePolicySuite()).getLength(); + final long pos = fileLength - lastBlock.getNumBytes(); + return createLocatedBlock(lastBlock, pos, + BlockTokenIdentifier.AccessMode.WRITE); } /** @@ -895,18 +885,14 @@ public class BlockManager implements BlockStatsMXBean { private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) throws IOException { if (!blk.isComplete()) { + final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature(); if (blk.isStriped()) { - final BlockInfoStripedUnderConstruction uc = - (BlockInfoStripedUnderConstruction) blk; final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, false); } else { - assert blk instanceof BlockInfoContiguousUnderConstruction; - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); @@ -1923,9 +1909,7 @@ public class BlockManager implements BlockStatsMXBean { StatefulBlockInfo(BlockInfo storedBlock, Block reportedBlock, ReplicaState reportedState) { - Preconditions.checkArgument( - storedBlock instanceof BlockInfoContiguousUnderConstruction || - storedBlock instanceof BlockInfoStripedUnderConstruction); + Preconditions.checkArgument(!storedBlock.isComplete()); this.storedBlock = storedBlock; this.reportedBlock = reportedBlock; this.reportedState = reportedState; @@ -2335,13 +2319,14 @@ public class BlockManager implements BlockStatsMXBean { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)storedBlock; - uc.addReplicaIfNotPresent(storageInfo, iblk, reportedState); + storedBlock.getUnderConstructionFeature() + .addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) { - int numOfReplicas = uc.getNumExpectedLocations(); + int numOfReplicas = storedBlock.getUnderConstructionFeature() + .getNumExpectedLocations(); namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock); } //and fall through to next clause @@ -2469,11 +2454,6 @@ public class BlockManager implements BlockStatsMXBean { // Ignore replicas already scheduled to be removed from the DN if(invalidateBlocks.contains(dn, block)) { - /* - * TODO: following assertion is incorrect, see HDFS-2668 assert - * storedBlock.findDatanode(dn) < 0 : "Block " + block + - * " in recentInvalidatesSet should not appear in DN " + dn; - */ return storedBlock; } @@ -2704,9 +2684,8 @@ public class BlockManager implements BlockStatsMXBean { void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, DatanodeStorageInfo storageInfo) throws IOException { BlockInfo block = ucBlock.storedBlock; - final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block; - uc.addReplicaIfNotPresent(storageInfo, ucBlock.reportedBlock, - ucBlock.reportedState); + block.getUnderConstructionFeature().addReplicaIfNotPresent(storageInfo, + ucBlock.reportedBlock, ucBlock.reportedState); if (ucBlock.reportedState == ReplicaState.FINALIZED && (block.findStorageInfo(storageInfo) < 0)) { @@ -2766,8 +2745,7 @@ public class BlockManager implements BlockStatsMXBean { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - if (block instanceof BlockInfoContiguousUnderConstruction || - block instanceof BlockInfoStripedUnderConstruction) { + if (!block.isComplete()) { //refresh our copy in case the block got completed in another thread storedBlock = getStoredBlock(block); } else { @@ -4221,7 +4199,7 @@ public class BlockManager implements BlockStatsMXBean { final LocatedBlock lb; if (info.isStriped()) { lb = newLocatedStripedBlock(eb, locs, - ((BlockInfoStripedUnderConstruction)info).getBlockIndices(), + info.getUnderConstructionFeature().getBlockIndices(), offset, false); } else { lb = newLocatedBlock(eb, locs, offset, false); http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java new file mode 100644 index 0000000..58b455e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE; + +/** + * Represents the under construction feature of a Block. + * This is usually the last block of a file opened for write or append. + */ +public class BlockUnderConstructionFeature { + private BlockUCState blockUCState; + + /** + * Block replicas as assigned when the block was allocated. + */ + private ReplicaUnderConstruction[] replicas; + + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ + private int primaryNodeIndex = -1; + + /** + * The new generation stamp, which this block will have + * after the recovery succeeds. Also used as a recovery id to identify + * the right recovery if any of the abandoned recoveries re-appear. + */ + private long blockRecoveryId = 0; + + /** + * The block source to use in the event of copy-on-write truncate. + */ + private Block truncateBlock; + + public BlockUnderConstructionFeature(Block blk, + BlockUCState state, DatanodeStorageInfo[] targets, boolean isStriped) { + assert getBlockUCState() != COMPLETE : + "BlockUnderConstructionFeature cannot be in COMPLETE state"; + this.blockUCState = state; + setExpectedLocations(blk, targets, isStriped); + } + + /** Set expected locations */ + public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets, + boolean isStriped) { + int numLocations = targets == null ? 0 : targets.length; + this.replicas = new ReplicaUnderConstruction[numLocations]; + for(int i = 0; i < numLocations; i++) { + // when creating a new striped block we simply sequentially assign block + // index to each storage + Block replicaBlock = isStriped ? + new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) : + block; + replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i], + ReplicaState.RBW); + } + } + + /** + * Create array of expected replica locations + * (as has been assigned by chooseTargets()). + */ + public DatanodeStorageInfo[] getExpectedStorageLocations() { + int numLocations = getNumExpectedLocations(); + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; + for (int i = 0; i < numLocations; i++) { + storages[i] = replicas[i].getExpectedStorageLocation(); + } + return storages; + } + + /** + * @return the index array indicating the block index in each storage. Used + * only by striped blocks. + */ + public int[] getBlockIndices() { + int numLocations = getNumExpectedLocations(); + int[] indices = new int[numLocations]; + for (int i = 0; i < numLocations; i++) { + indices[i] = BlockIdManager.getBlockIndex(replicas[i]); + } + return indices; + } + + public int getNumExpectedLocations() { + return replicas == null ? 0 : replicas.length; + } + + /** + * Return the state of the block under construction. + * @see BlockUCState + */ + public BlockUCState getBlockUCState() { + return blockUCState; + } + + void setBlockUCState(BlockUCState s) { + blockUCState = s; + } + + public long getBlockRecoveryId() { + return blockRecoveryId; + } + + /** Get recover block */ + public Block getTruncateBlock() { + return truncateBlock; + } + + public void setTruncateBlock(Block recoveryBlock) { + this.truncateBlock = recoveryBlock; + } + + /** + * Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}. + */ + void commit() { + blockUCState = BlockUCState.COMMITTED; + } + + void removeStaleReplicas(BlockInfo block) { + final long genStamp = block.getGenerationStamp(); + if (replicas != null) { + // Remove replicas with wrong gen stamp. The replica list is unchanged. + for (ReplicaUnderConstruction r : replicas) { + if (genStamp != r.getGenerationStamp()) { + r.getExpectedStorageLocation().removeBlock(block); + NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica " + + "from location: {}", r.getExpectedStorageLocation()); + } + } + } + } + + /** + * Initialize lease recovery for this block. + * Find the first alive data-node starting from the previous primary and + * make it primary. + */ + public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId) { + setBlockUCState(BlockUCState.UNDER_RECOVERY); + blockRecoveryId = recoveryId; + if (replicas == null || replicas.length == 0) { + NameNode.blockStateChangeLog.warn("BLOCK*" + + " BlockUnderConstructionFeature.initLeaseRecovery:" + + " No blocks found, lease removed."); + // sets primary node index and return. + primaryNodeIndex = -1; + return; + } + boolean allLiveReplicasTriedAsPrimary = true; + for (ReplicaUnderConstruction replica : replicas) { + // Check if all replicas have been tried or not. + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); + } + } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.length; i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) { + continue; + } + final ReplicaUnderConstruction ruc = replicas[i]; + final long lastUpdate = ruc.getExpectedStorageLocation() + .getDatanodeDescriptor().getLastUpdateMonotonic(); + if (lastUpdate > mostRecentLastUpdate) { + primaryNodeIndex = i; + primary = ruc; + mostRecentLastUpdate = lastUpdate; + } + } + if (primary != null) { + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(blockInfo); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.info( + "BLOCK* {} recovery started, primary={}", this, primary); + } + } + + /** Add the reported replica if it is not already in the replica list. */ + void addReplicaIfNotPresent(DatanodeStorageInfo storage, + Block reportedBlock, ReplicaState rState) { + if (replicas == null) { + replicas = new ReplicaUnderConstruction[1]; + replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState); + } else { + for (int i = 0; i < replicas.length; i++) { + DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation(); + if (expected == storage) { + replicas[i].setBlockId(reportedBlock.getBlockId()); + replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp()); + return; + } else if (expected != null && expected.getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage, + rState); + return; + } + } + ReplicaUnderConstruction[] newReplicas = + new ReplicaUnderConstruction[replicas.length + 1]; + System.arraycopy(replicas, 0, newReplicas, 0, replicas.length); + newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction( + reportedBlock, storage, rState); + replicas = newReplicas; + } + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(100); + appendUCParts(b); + return b.toString(); + } + + private void appendUCParts(StringBuilder sb) { + sb.append("{UCState=").append(blockUCState) + .append(", truncateBlock=").append(truncateBlock) + .append(", primaryNodeIndex=").append(primaryNodeIndex) + .append(", replicas=["); + if (replicas != null) { + int i = 0; + for (ReplicaUnderConstruction r : replicas) { + r.appendStringTo(sb); + if (++i < replicas.length) { + sb.append(", "); + } + } + } + sb.append("]}"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 87394f6..a4d5442 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -227,7 +227,7 @@ public class DatanodeDescriptor extends DatanodeInfo { private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks = + private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ private final LightWeightHashSet<Block> invalidateBlocks = @@ -624,7 +624,7 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Store block recovery work. */ - void addBlockToBeRecovered(BlockInfoUnderConstruction block) { + void addBlockToBeRecovered(BlockInfo block) { if(recoverBlocks.contains(block)) { // this prevents adding the same block twice to the recovery queue BlockManager.LOG.info(block + " is already in the recovery queue"); @@ -678,11 +678,11 @@ public class DatanodeDescriptor extends DatanodeInfo { return erasurecodeBlocks.poll(maxTransfers); } - public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { - List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); + public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) { + List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) return null; - return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]); + return blocks.toArray(new BlockInfo[blocks.size()]); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index dbd07d4..8d2e750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1381,13 +1381,15 @@ public class DatanodeManager { } //check lease recovery - BlockInfoUnderConstruction[] blocks = nodeinfo - .getLeaseRecoveryCommand(Integer.MAX_VALUE); + BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); - for (BlockInfoUnderConstruction b : blocks) { - final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); + for (BlockInfo b : blocks) { + final BlockUnderConstructionFeature uc = + b.getUnderConstructionFeature(); + assert uc != null; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = new ArrayList<>(storages.length); @@ -1398,12 +1400,12 @@ public class DatanodeManager { } // If we are performing a truncate recovery than set recovery fields // to old block. - boolean truncateRecovery = b.getTruncateBlock() != null; + boolean truncateRecovery = uc.getTruncateBlock() != null; boolean copyOnTruncateRecovery = truncateRecovery && - b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId(); + uc.getTruncateBlock().getBlockId() != b.getBlockId(); ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? - new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : - new ExtendedBlock(blockPoolId, b.toBlock()); + new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) : + new ExtendedBlock(blockPoolId, b); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. DatanodeInfo[] recoveryInfos; @@ -1420,13 +1422,13 @@ public class DatanodeManager { recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); } if(truncateRecovery) { - Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() : - b.getTruncateBlock(); + Block recoveryBlock = (copyOnTruncateRecovery) ? b : + uc.getTruncateBlock(); brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock)); } else { brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, - b.getBlockRecoveryId())); + uc.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java index 215a761..3d5da8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java @@ -28,8 +28,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -102,7 +103,7 @@ final class FSDirTruncateOp { final BlockInfo last = file.getLastBlock(); if (last != null && last.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { - final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last) + final Block truncatedBlock = last.getUnderConstructionFeature() .getTruncateBlock(); if (truncatedBlock != null) { final long truncateLength = file.computeFileSize(false, false) @@ -231,43 +232,42 @@ final class FSDirTruncateOp { oldBlock))); } - BlockInfoContiguousUnderConstruction truncatedBlockUC; + final BlockInfo truncatedBlockUC; BlockManager blockManager = fsn.getFSDirectory().getBlockManager(); if (shouldCopyOnTruncate) { // Add new truncateBlock into blocksMap and // use oldBlock as a source for copy-on-truncate recovery - truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock, + truncatedBlockUC = new BlockInfoContiguous(newBlock, file.getPreferredBlockReplication()); + truncatedBlockUC.convertToBlockUnderConstruction( + BlockUCState.UNDER_CONSTRUCTION, blockManager.getStorages(oldBlock)); truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); - truncatedBlockUC.setTruncateBlock(oldBlock); - file.convertLastBlockToUC(truncatedBlockUC, - blockManager.getStorages(oldBlock)); + truncatedBlockUC.getUnderConstructionFeature().setTruncateBlock(oldBlock); + file.setLastBlock(truncatedBlockUC); blockManager.addBlockCollection(truncatedBlockUC, file); NameNode.stateChangeLog.debug( "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new" + " size {} new block {} old block {}", - truncatedBlockUC.getNumBytes(), newBlock, - truncatedBlockUC.getTruncateBlock()); + truncatedBlockUC.getNumBytes(), newBlock, oldBlock); } else { // Use new generation stamp for in-place truncate recovery blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); oldBlock = file.getLastBlock(); assert !oldBlock.isComplete() : "oldBlock should be under construction"; - truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock; - truncatedBlockUC.setTruncateBlock(new Block(oldBlock)); - truncatedBlockUC.getTruncateBlock().setNumBytes( - oldBlock.getNumBytes() - lastBlockDelta); - truncatedBlockUC.getTruncateBlock().setGenerationStamp( - newBlock.getGenerationStamp()); - - NameNode.stateChangeLog.debug( - "BLOCK* prepareFileForTruncate: {} Scheduling in-place block " - + "truncate to new size {}", truncatedBlockUC.getTruncateBlock() - .getNumBytes(), truncatedBlockUC); + BlockUnderConstructionFeature uc = oldBlock.getUnderConstructionFeature(); + uc.setTruncateBlock(new Block(oldBlock)); + uc.getTruncateBlock().setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); + uc.getTruncateBlock().setGenerationStamp(newBlock.getGenerationStamp()); + truncatedBlockUC = oldBlock; + + NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " + + "{} Scheduling in-place block truncate to new size {}", + uc, uc.getTruncateBlock().getNumBytes()); } if (shouldRecoverNow) { - truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp()); + truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery( + truncatedBlockUC, newBlock.getGenerationStamp()); } return newBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index ffd8fbc..68aef76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; @@ -77,7 +77,7 @@ class FSDirWriteFileOp { Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction - BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block); + BlockInfo uc = fileNode.removeLastBlock(block); if (uc == null) { return false; } @@ -214,8 +214,8 @@ class FSDirWriteFileOp { static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, DatanodeStorageInfo[] locs, long offset) throws IOException { - LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), - blk, locs, offset); + LocatedBlock lBlk = BlockManager.newLocatedBlock( + fsn.getExtendedBlock(new Block(blk)), blk, locs, offset); fsn.getBlockManager().setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE); return lBlk; @@ -247,8 +247,8 @@ class FSDirWriteFileOp { } else { // add new chosen targets to already allocated block and return BlockInfo lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); + lastBlockInFile.getUnderConstructionFeature().setExpectedLocations( + lastBlockInFile, targets, pendingFile.isStriped()); offset = pendingFile.computeFileSize(); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); } @@ -542,7 +542,8 @@ class FSDirWriteFileOp { // check quota limits and updated space consumed fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), numLocations, true); - blockInfo = new BlockInfoStripedUnderConstruction(block, ecPolicy, + blockInfo = new BlockInfoStriped(block, ecPolicy); + blockInfo.convertToBlockUnderConstruction( HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); } else { // check quota limits and updated space consumed @@ -550,9 +551,9 @@ class FSDirWriteFileOp { fileINode.getPreferredBlockReplication(), true); short numLocations = fileINode.getFileReplication(); - blockInfo = new BlockInfoContiguousUnderConstruction(block, - numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, - targets); + blockInfo = new BlockInfoContiguous(block, numLocations); + blockInfo.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); @@ -692,10 +693,10 @@ class FSDirWriteFileOp { "allocation of a new block in " + src + ". Returning previously" + " allocated block " + lastBlockInFile); long offset = file.computeFileSize(); - BlockInfoUnderConstruction lastBlockUC = - (BlockInfoUnderConstruction) lastBlockInFile; + BlockUnderConstructionFeature uc = + lastBlockInFile.getUnderConstructionFeature(); onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, - lastBlockUC.getExpectedStorageLocations(), offset); + uc.getExpectedStorageLocations(), offset); return new FileState(file, src, iip); } else { // Case 3 http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 008a327..a61161f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -42,15 +42,14 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; @@ -991,12 +990,14 @@ public class FSEditLogLoader { final BlockInfo newBlockInfo; boolean isStriped = ecZone != null; if (isStriped) { - newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, + newBlockInfo = new BlockInfoStriped(newBlock, ecZone.getErasureCodingPolicy()); } else { - newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock, + newBlockInfo = new BlockInfoContiguous(newBlock, file.getPreferredBlockReplication()); } + newBlockInfo.convertToBlockUnderConstruction( + BlockUCState.UNDER_CONSTRUCTION, null); fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file); file.addBlock(newBlockInfo); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); @@ -1077,12 +1078,14 @@ public class FSEditLogLoader { // what about an old-version fsync() where fsync isn't called // until several blocks in? if (isStriped) { - newBI = new BlockInfoStripedUnderConstruction(newBlock, + newBI = new BlockInfoStriped(newBlock, ecZone.getErasureCodingPolicy()); } else { - newBI = new BlockInfoContiguousUnderConstruction(newBlock, + newBI = new BlockInfoContiguous(newBlock, file.getPreferredBlockReplication()); } + newBI.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, + null); } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index e7c87d6..0b1902f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -756,7 +755,7 @@ public class FSImageFormat { // file // read blocks - Block[] blocks = new BlockInfoContiguous[numBlocks]; + BlockInfo[] blocks = new BlockInfoContiguous[numBlocks]; for (int j = 0; j < numBlocks; j++) { blocks[j] = new BlockInfoContiguous(replication); blocks[j].readFields(in); @@ -778,9 +777,9 @@ public class FSImageFormat { clientMachine = FSImageSerialization.readString(in); // convert the last block to BlockUC if (blocks.length > 0) { - Block lastBlk = blocks[blocks.length - 1]; - blocks[blocks.length - 1] = - new BlockInfoContiguousUnderConstruction(lastBlk, replication); + BlockInfo lastBlk = blocks[blocks.length - 1]; + lastBlk.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 51b04d0..ffaf86b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -45,10 +45,9 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; @@ -378,11 +377,13 @@ public final class FSImageFormatPBINode { final BlockInfo ucBlk; if (isStriped) { BlockInfoStriped striped = (BlockInfoStriped) lastBlk; - ucBlk = new BlockInfoStripedUnderConstruction(striped, ecPolicy); + ucBlk = new BlockInfoStriped(striped, ecPolicy); } else { - ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, + ucBlk = new BlockInfoContiguous(lastBlk, replication); } + ucBlk.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); file.setBlock(file.numBlocks() - 1, ucBlk); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/164cbe64/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index af3f813..0567efd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; @@ -138,8 +137,9 @@ public class FSImageSerialization { // last block is UNDER_CONSTRUCTION if(numBlocks > 0) { blk.readFields(in); - blocksContiguous[i] = new BlockInfoContiguousUnderConstruction( - blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + blocksContiguous[i] = new BlockInfoContiguous(blk, blockReplication); + blocksContiguous[i].convertToBlockUnderConstruction( + BlockUCState.UNDER_CONSTRUCTION, null); } PermissionStatus perm = PermissionStatus.read(in);
