This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new b05c0ce9724 HDFS-17496. DataNode supports more fine-grained dataset lock based on blockid. (#7280). Contributed by hfutatzhanghb. b05c0ce9724 is described below commit b05c0ce97247551b33ab33422cbe8d5358c055a0 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Thu Feb 27 20:18:50 2025 +0800 HDFS-17496. DataNode supports more fine-grained dataset lock based on blockid. (#7280). Contributed by hfutatzhanghb. Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../hdfs/server/common/DataNodeLockManager.java | 9 +- .../datanode/DataNodeLayoutSubLockStrategy.java | 33 ++++++ .../hdfs/server/datanode/DataSetLockManager.java | 38 ++++++- .../DataSetSubLockStrategy.java} | 43 ++------ .../hadoop/hdfs/server/datanode/DatanodeUtil.java | 34 +++++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 118 ++++++++++++++------- .../server/datanode/TestDataSetLockManager.java | 11 ++ .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 7 +- .../hdfs/server/namenode/ha/TestDNFencing.java | 11 +- 9 files changed, 221 insertions(+), 83 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java index e7a3b38357a..49e8b626efb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java @@ -24,12 +24,17 @@ public interface DataNodeLockManager<T extends AutoCloseDataSetLock> { /** - * Acquire block pool level first if you want to Acquire volume lock. + * Acquire block pool level and volume level lock first if you want to acquire dir lock. * Or only acquire block pool level lock. + * There are several locking sequential patterns as below: + * 1. block pool + * 2. block poll -> volume + * 3. block pool level -> volume -> dir */ enum LockLevel { BLOCK_POOl, - VOLUME + VOLUME, + DIR } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutSubLockStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutSubLockStrategy.java new file mode 100644 index 00000000000..7665d9c1cf3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutSubLockStrategy.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.List; + +public class DataNodeLayoutSubLockStrategy implements DataSetSubLockStrategy { + @Override + public String blockIdToSubLock(long blockid) { + return DatanodeUtil.idToBlockDirSuffix(blockid); + } + + @Override + public List<String> getAllSubLockNames() { + return DatanodeUtil.getAllSubDirNameForDataSetLock(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java index 3abcf12fc8b..dafbb4ed502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java @@ -96,6 +96,13 @@ private String generateLockName(LockLevel level, String... resources) { + resources[0] + "volume lock :" + resources[1]); } return resources[0] + resources[1]; + } else if (resources.length == 3 && level == LockLevel.DIR) { + if (resources[0] == null || resources[1] == null || resources[2] == null) { + throw new IllegalArgumentException("acquire a null dataset lock : " + + resources[0] + ",volume lock :" + resources[1] + + ",subdir lock :" + resources[2]); + } + return resources[0] + resources[1] + resources[2]; } else { throw new IllegalArgumentException("lock level do not match resource"); } @@ -156,7 +163,7 @@ public DataSetLockManager(Configuration conf, DataNode dn) { public AutoCloseDataSetLock readLock(LockLevel level, String... resources) { if (level == LockLevel.BLOCK_POOl) { return getReadLock(level, resources[0]); - } else { + } else if (level == LockLevel.VOLUME){ AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); AutoCloseDataSetLock volLock = getReadLock(level, resources); volLock.setParentLock(bpLock); @@ -165,6 +172,17 @@ public AutoCloseDataSetLock readLock(LockLevel level, String... resources) { resources[0]); } return volLock; + } else { + AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]); + volLock.setParentLock(bpLock); + AutoCloseDataSetLock dirLock = getReadLock(level, resources); + dirLock.setParentLock(volLock); + if (openLockTrace) { + LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " + + resources[0] + resources[1]); + } + return dirLock; } } @@ -172,7 +190,7 @@ public AutoCloseDataSetLock readLock(LockLevel level, String... resources) { public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) { if (level == LockLevel.BLOCK_POOl) { return getWriteLock(level, resources[0]); - } else { + } else if (level == LockLevel.VOLUME) { AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); AutoCloseDataSetLock volLock = getWriteLock(level, resources); volLock.setParentLock(bpLock); @@ -181,6 +199,17 @@ public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) { resources[0]); } return volLock; + } else { + AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]); + volLock.setParentLock(bpLock); + AutoCloseDataSetLock dirLock = getWriteLock(level, resources); + dirLock.setParentLock(volLock); + if (openLockTrace) { + LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " + + resources[0] + resources[1]); + } + return dirLock; } } @@ -235,8 +264,13 @@ public void addLock(LockLevel level, String... resources) { String lockName = generateLockName(level, resources); if (level == LockLevel.BLOCK_POOl) { lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + } else if (level == LockLevel.VOLUME) { + lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair)); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); } else { lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair)); + lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], resources[1]), + new ReentrantReadWriteLock(isFair)); lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java similarity index 50% copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java index e7a3b38357a..f5f09882c18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java @@ -16,44 +16,21 @@ * limitations under the License. */ -package org.apache.hadoop.hdfs.server.common; +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.List; /** - * Use for manage a set of lock for datanode. + * This interface is used to generate sub lock name for a blockid. */ -public interface DataNodeLockManager<T extends AutoCloseDataSetLock> { - - /** - * Acquire block pool level first if you want to Acquire volume lock. - * Or only acquire block pool level lock. - */ - enum LockLevel { - BLOCK_POOl, - VOLUME - } +public interface DataSetSubLockStrategy { /** - * Acquire readLock and then lock. + * Generate sub lock name for the given blockid. + * @param blockid the block id. + * @return sub lock name for the input blockid. */ - T readLock(LockLevel level, String... resources); + String blockIdToSubLock(long blockid); - /** - * Acquire writeLock and then lock. - */ - T writeLock(LockLevel level, String... resources); - - /** - * Add a lock to LockManager. - */ - void addLock(LockLevel level, String... resources); - - /** - * Remove a lock from LockManager. - */ - void removeLock(LockLevel level, String... resources); - - /** - * LockManager may need to back hook. - */ - void hook(); + List<String> getAllSubLockNames(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java index c98ff5413bd..4961eeba51c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java @@ -21,6 +21,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; @@ -37,6 +39,7 @@ public class DatanodeUtil { public static final String DISK_ERROR = "Possible disk error: "; private static final String SEP = System.getProperty("file.separator"); + private static final long MASK = 0x1F; /** Get the cause of an I/O exception if caused by a possible disk error * @param ioe an I/O exception @@ -112,6 +115,21 @@ public static boolean dirNoFilesRecursive( return true; } + /** + * Take an example. + * We hava a block with blockid mapping to: + * "/data1/hadoop/hdfs/datanode/current/BP-xxxx/current/finalized/subdir0/subdir1" + * We return "subdir0/subdir0". + * @param blockId the block id. + * @return two-level subdir string where block will be stored. + */ + public static String idToBlockDirSuffix(long blockId) { + int d1 = (int) ((blockId >> 16) & MASK); + int d2 = (int) ((blockId >> 8) & MASK); + return DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + + DataStorage.BLOCK_SUBDIR_PREFIX + d2; + } + /** * Get the directory where a finalized block with this ID should be stored. * Do not attempt to create the directory. @@ -120,13 +138,21 @@ public static boolean dirNoFilesRecursive( * @return */ public static File idToBlockDir(File root, long blockId) { - int d1 = (int) ((blockId >> 16) & 0x1F); - int d2 = (int) ((blockId >> 8) & 0x1F); - String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + - DataStorage.BLOCK_SUBDIR_PREFIX + d2; + String path = idToBlockDirSuffix(blockId); return new File(root, path); } + public static List<String> getAllSubDirNameForDataSetLock() { + List<String> res = new ArrayList<>(); + for (int d1 = 0; d1 <= MASK; d1++) { + for (int d2 = 0; d2 <= MASK; d2++) { + res.add(DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + + DataStorage.BLOCK_SUBDIR_PREFIX + d2); + } + } + return res; + } + /** * @return the FileInputStream for the meta data of the given block. * @throws FileNotFoundException diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index eeec1bb7288..934c5faee2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutSubLockStrategy; import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; +import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; @@ -198,8 +200,9 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, - bpid)) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, + bpid, getReplicaInfo(bpid, blkid).getStorageUuid(), + datasetSubLockStrategy.blockIdToSubLock(blkid))) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -288,6 +291,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private long lastDirScannerNotifyTime; private volatile long lastDirScannerFinishTime; + private final DataSetSubLockStrategy datasetSubLockStrategy; + /** * An FSDataset has a directory where it loads its data files. */ @@ -392,6 +397,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT); lastDirScannerNotifyTime = System.currentTimeMillis(); + this.datasetSubLockStrategy = new DataNodeLayoutSubLockStrategy(); } /** @@ -430,6 +436,12 @@ private synchronized void activateVolume( FsVolumeReference ref) throws IOException { for (String bp : volumeMap.getBlockPoolList()) { lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID()); + List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); + for (String dir : allSubDirNameForDataSetLock) { + lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir); + LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + bp, ref.getVolume().getStorageID(), dir); + } } DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); if (dnStorage != null) { @@ -629,6 +641,12 @@ public void removeVolumes( for (String storageUuid : storageToRemove) { storageMap.remove(storageUuid); for (String bp : volumeMap.getBlockPoolList()) { + List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); + for (String dir : allSubDirNameForDataSetLock) { + lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir); + LOG.info("Removed DIR lock for bpid:{}, volume storageid:{}, dir:{}", + bp, storageUuid, dir); + } lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid); } } @@ -819,8 +837,9 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -914,8 +933,9 @@ String getStorageUuidForLock(ExtendedBlock b) @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1380,8 +1400,9 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, @Override // FsDatasetSpi public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1433,8 +1454,9 @@ public ReplicaHandler append(ExtendedBlock b, private ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - bpid, replicaInfo.getStorageUuid())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + bpid, replicaInfo.getStorageUuid(), + datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) { // If the block is cached, start uncaching it. if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new IOException("Only a Finalized replica can be appended to; " @@ -1530,8 +1552,9 @@ public ReplicaHandler recoverAppend( while (true) { try { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaInPipeline replica; @@ -1564,8 +1587,9 @@ public Replica recoverClose(ExtendedBlock b, long newGS, b, newGS, expectedBlockLen); while (true) { try { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1650,8 +1674,9 @@ public ReplicaHandler createRbw( } ReplicaInPipeline newReplicaInfo; - try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), v.getStorageID())) { + try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), v.getStorageID(), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { newReplicaInfo = v.createRbw(b); if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) { throw new IOException("CreateRBW returned a replica of state " @@ -1681,8 +1706,9 @@ public ReplicaHandler recoverRbw( try { while (true) { try { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1713,8 +1739,9 @@ public ReplicaHandler recoverRbw( private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1775,8 +1802,9 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1915,8 +1943,9 @@ public ReplicaHandler createTemporary(StorageType storageType, .getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); ReplicaInPipeline newReplicaInfo; - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), v.getStorageID())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), v.getStorageID(), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { try { newReplicaInfo = v.createTemporary(b); LOG.debug("creating temporary for block: {} on volume: {}", @@ -1973,8 +2002,9 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block: " + b + " from Interrupted Thread"); @@ -2010,8 +2040,9 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - bpid, replicaInfo.getStorageUuid())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + bpid, replicaInfo.getStorageUuid(), + datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) { // Compare generation stamp of old and new replica before finalizing if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() > replicaInfo.getGenerationStamp()) { @@ -2060,8 +2091,9 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, - b.getBlockPoolId(), getStorageUuidForLock(b))) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, + b.getBlockPoolId(), getStorageUuidForLock(b), + datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -2459,7 +2491,8 @@ boolean removeReplicaFromMem(final ExtendedBlock block, final FsVolumeImpl volum final String bpid = block.getBlockPoolId(); final Block localBlock = block.getLocalBlock(); final long blockId = localBlock.getBlockId(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, volume.getStorageID(), + datasetSubLockStrategy.blockIdToSubLock(blockId))) { final ReplicaInfo info = volumeMap.get(bpid, localBlock); if (info == null) { ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId); @@ -2548,8 +2581,8 @@ private void cacheBlock(String bpid, long blockId) { bpid + ": ReplicaInfo not found."); return; } - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, - info.getStorageUuid())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, + info.getStorageUuid(), datasetSubLockStrategy.blockIdToSubLock(blockId))) { boolean success = false; try { info = volumeMap.get(bpid, blockId); @@ -2746,7 +2779,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) lastDirScannerNotifyTime = startTimeMs; } String storageUuid = vol.getStorageID(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, + vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId))) { if (!storageMap.containsKey(storageUuid)) { // Storage was already removed return; @@ -3231,8 +3265,9 @@ private ReplicaInfo updateReplicaUnderRecovery( @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, - block.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, + block.getBlockPoolId(), getStorageUuidForLock(block), + datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -3259,6 +3294,12 @@ public void addBlockPool(String bpid, Configuration conf) Set<String> vols = storageMap.keySet(); for (String v : vols) { lockManager.addLock(LockLevel.VOLUME, bpid, v); + List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); + for (String dir : allSubDirNameForDataSetLock) { + lockManager.addLock(LockLevel.DIR, bpid, v, dir); + LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + bpid, v, dir); + } } } try { @@ -3386,8 +3427,9 @@ public void deleteBlockPool(String bpid, boolean force) @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, - block.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR, + block.getBlockPoolId(), getStorageUuidForLock(block), + datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java index b514accdf16..6cb12d2681f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java @@ -37,6 +37,7 @@ public void init() { public void testBaseFunc() { manager.addLock(LockLevel.BLOCK_POOl, "BPtest"); manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + manager.addLock(LockLevel.DIR, "BPtest", "Volumetest", "SubDirtest"); AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, "BPtest"); @@ -62,6 +63,16 @@ public void testBaseFunc() { manager.lockLeakCheck(); assertNull(manager.getLastException()); + AutoCloseDataSetLock lock6 = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); + AutoCloseDataSetLock lock7 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + AutoCloseDataSetLock lock8 = manager.readLock(LockLevel.DIR, + "BPtest", "Volumetest", "SubDirtest"); + lock8.close(); + lock7.close(); + lock6.close(); + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest"); manager.lockLeakCheck(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 975874edb1f..f58ee729ef9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -1946,7 +1946,12 @@ public void delayDeleteReplica() { assertFalse(uuids.contains(dn.getDatanodeUuid())); // This replica has deleted from datanode memory. - assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId())); + try { + Block storedBlock = ds.getStoredBlock(bpid, extendedBlock.getBlockId()); + assertNull(storedBlock); + } catch (Exception e) { + GenericTestUtils.assertExceptionContains("ReplicaNotFoundException", e); + } } finally { cluster.shutdown(); DataNodeFaultInjector.set(oldInjector); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 177b7bca4ce..fa84a204a5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.RwLockMode; @@ -597,9 +598,13 @@ private int getTrueReplication(MiniDFSCluster cluster, ExtendedBlock block) throws IOException { int count = 0; for (DataNode dn : cluster.getDataNodes()) { - if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock( - block.getBlockPoolId(), block.getBlockId()) != null) { - count++; + try { + if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock( + block.getBlockPoolId(), block.getBlockId()) != null) { + count++; + } + } catch (ReplicaNotFoundException e) { + continue; } } return count; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org