This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94d6a77c394 HDFS-17496. DataNode supports more fine-grained dataset
lock based on blockid. (#6764). Contributed by farmmamba.
94d6a77c394 is described below
commit 94d6a77c39452c82ba78cef2cd96f5c8ff4fcfa3
Author: hfutatzhanghb <[email protected]>
AuthorDate: Thu Jan 2 18:41:43 2025 +0800
HDFS-17496. DataNode supports more fine-grained dataset lock based on
blockid. (#6764). Contributed by farmmamba.
Signed-off-by: He Xiaoqiao <[email protected]>
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../hdfs/server/common/DataNodeLockManager.java | 3 +-
.../hdfs/server/datanode/DataSetLockManager.java | 38 ++++++-
.../DataSetSubLockStrategy.java} | 43 ++------
.../server/datanode/ModDataSetSubLockStrategy.java | 53 +++++++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 121 ++++++++++++++-------
.../src/main/resources/hdfs-default.xml | 9 ++
.../server/datanode/TestDataSetLockManager.java | 11 ++
.../datanode/fsdataset/impl/TestFsDatasetImpl.java | 7 +-
.../hdfs/server/namenode/ha/TestDNFencing.java | 11 +-
10 files changed, 222 insertions(+), 78 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index dd3193fdadf..d85e7c58231 100755
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1744,6 +1744,10 @@ public class DFSConfigKeys extends
CommonConfigurationKeys {
public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;
+ public static final String DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY =
+ "dfs.datanode.dataset.sublock.count";
+ public static final long DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT = 1000L;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
index e7a3b38357a..cb22a057062 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
@@ -29,7 +29,8 @@ public interface DataNodeLockManager<T extends
AutoCloseDataSetLock> {
*/
enum LockLevel {
BLOCK_POOl,
- VOLUME
+ VOLUME,
+ DIR
}
/**
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
index 5579541eb72..61492467a41 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java
@@ -94,6 +94,13 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
+ resources[0] + "volume lock :" + resources[1]);
}
return resources[0] + resources[1];
+ } else if (resources.length == 3 && level == LockLevel.DIR) {
+ if (resources[0] == null || resources[1] == null || resources[2] ==
null) {
+ throw new IllegalArgumentException("acquire a null dataset lock : "
+ + resources[0] + ",volume lock :" + resources[1]
+ + ",subdir lock :" + resources[2]);
+ }
+ return resources[0] + resources[1] + resources[2];
} else {
throw new IllegalArgumentException("lock level do not match resource");
}
@@ -153,7 +160,7 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getReadLock(level, resources[0]);
- } else {
+ } else if (level == LockLevel.VOLUME){
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl,
resources[0]);
AutoCloseDataSetLock volLock = getReadLock(level, resources);
volLock.setParentLock(bpLock);
@@ -162,6 +169,17 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
resources[0]);
}
return volLock;
+ } else {
+ AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl,
resources[0]);
+ AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME,
resources[0], resources[1]);
+ volLock.setParentLock(bpLock);
+ AutoCloseDataSetLock dirLock = getReadLock(level, resources);
+ dirLock.setParentLock(volLock);
+ if (openLockTrace) {
+ LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + "
parent lock " +
+ resources[0] + resources[1]);
+ }
+ return dirLock;
}
}
@@ -169,7 +187,7 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getWriteLock(level, resources[0]);
- } else {
+ } else if (level == LockLevel.VOLUME) {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl,
resources[0]);
AutoCloseDataSetLock volLock = getWriteLock(level, resources);
volLock.setParentLock(bpLock);
@@ -178,6 +196,17 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
resources[0]);
}
return volLock;
+ } else {
+ AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl,
resources[0]);
+ AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME,
resources[0], resources[1]);
+ volLock.setParentLock(bpLock);
+ AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
+ dirLock.setParentLock(volLock);
+ if (openLockTrace) {
+ LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + "
parent lock " +
+ resources[0] + resources[1]);
+ }
+ return dirLock;
}
}
@@ -224,8 +253,13 @@ public class DataSetLockManager implements
DataNodeLockManager<AutoCloseDataSetL
String lockName = generateLockName(level, resources);
if (level == LockLevel.BLOCK_POOl) {
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
+ } else if (level == LockLevel.VOLUME) {
+ lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+ lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
+ lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0],
resources[1]),
+ new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
}
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
similarity index 50%
copy from
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
copy to
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
index e7a3b38357a..7ba1df8df52 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/DataNodeLockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetSubLockStrategy.java
@@ -16,44 +16,21 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.common;
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
/**
- * Use for manage a set of lock for datanode.
+ * This interface is used to generate sub lock name for a blockid.
*/
-public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
-
- /**
- * Acquire block pool level first if you want to Acquire volume lock.
- * Or only acquire block pool level lock.
- */
- enum LockLevel {
- BLOCK_POOl,
- VOLUME
- }
+public interface DataSetSubLockStrategy {
/**
- * Acquire readLock and then lock.
+ * Generate sub lock name for the given blockid.
+ * @param blockid the block id.
+ * @return sub lock name for the input blockid.
*/
- T readLock(LockLevel level, String... resources);
+ String blockIdToSubLock(long blockid);
- /**
- * Acquire writeLock and then lock.
- */
- T writeLock(LockLevel level, String... resources);
-
- /**
- * Add a lock to LockManager.
- */
- void addLock(LockLevel level, String... resources);
-
- /**
- * Remove a lock from LockManager.
- */
- void removeLock(LockLevel level, String... resources);
-
- /**
- * LockManager may need to back hook.
- */
- void hook();
+ List<String> getAllSubLockName();
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
new file mode 100644
index 00000000000..5e736e54716
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ModDataSetSubLockStrategy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ModDataSetSubLockStrategy implements DataSetSubLockStrategy {
+ public static final Logger LOG =
LoggerFactory.getLogger(DataSetSubLockStrategy.class);
+
+ private static final String LOCK_NAME_PERFIX = "SubLock";
+ private long modFactor;
+
+ public ModDataSetSubLockStrategy(long mod) {
+ if (mod <= 0) {
+ mod = 1L;
+ }
+ this.modFactor = mod;
+ }
+
+ @Override
+ public String blockIdToSubLock(long blockid) {
+ return LOCK_NAME_PERFIX + (blockid % modFactor);
+ }
+
+ @Override
+ public List<String> getAllSubLockName() {
+ List<String> res = new ArrayList<>();
+ for (long i = 0L; i < modFactor; i++) {
+ res.add(LOCK_NAME_PERFIX + i);
+ }
+ return res;
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index eeec1bb7288..91b12daef81 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -65,9 +65,11 @@ import
org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
+import org.apache.hadoop.hdfs.server.datanode.ModDataSetSubLockStrategy;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -198,8 +200,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
- bpid)) {
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+ bpid, getReplicaInfo(bpid, blkid).getStorageUuid(),
+ datasetSubLockStrategy.blockIdToSubLock(blkid))) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@@ -288,6 +291,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private long lastDirScannerNotifyTime;
private volatile long lastDirScannerFinishTime;
+ private final DataSetSubLockStrategy datasetSubLockStrategy;
+ private final long datasetSubLockCount;
+
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -392,6 +398,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
lastDirScannerNotifyTime = System.currentTimeMillis();
+ datasetSubLockCount =
conf.getLong(DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY,
+ DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT);
+ this.datasetSubLockStrategy = new
ModDataSetSubLockStrategy(datasetSubLockCount);
}
/**
@@ -430,6 +439,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference ref) throws IOException {
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.addLock(LockLevel.VOLUME, bp,
ref.getVolume().getStorageID());
+ List<String> allSubDirNameForDataSetLock =
datasetSubLockStrategy.getAllSubLockName();
+ for (String dir : allSubDirNameForDataSetLock) {
+ lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(),
dir);
+ LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+ bp, ref.getVolume().getStorageID(), dir);
+ }
}
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
@@ -629,6 +644,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
for (String bp : volumeMap.getBlockPoolList()) {
+ List<String> allSubDirNameForDataSetLock =
datasetSubLockStrategy.getAllSubLockName();
+ for (String dir : allSubDirNameForDataSetLock) {
+ lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir);
+ LOG.info("Removed DIR lock for bpid:{}, volume storageid:{},
dir:{}",
+ bp, storageUuid, dir);
+ }
lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
}
}
@@ -819,8 +840,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException {
ReplicaInfo info;
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
- b.getBlockPoolId())) {
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@@ -914,8 +936,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
- try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@@ -1380,8 +1403,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@@ -1433,8 +1457,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- bpid, replicaInfo.getStorageUuid())) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ bpid, replicaInfo.getStorageUuid(),
+ datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
@@ -1530,8 +1555,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
while (true) {
try {
- try (AutoCloseableLock lock =
lockManager.writeLock(LockLevel.BLOCK_POOl,
- b.getBlockPoolId())) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica;
@@ -1564,8 +1590,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
b, newGS, expectedBlockLen);
while (true) {
try {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@@ -1650,8 +1677,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
ReplicaInPipeline newReplicaInfo;
- try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), v.getStorageID())) {
+ try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), v.getStorageID(),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
newReplicaInfo = v.createRbw(b);
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBW returned a replica of state "
@@ -1681,8 +1709,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
try {
while (true) {
try {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@@ -1713,8 +1742,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1775,8 +1805,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@@ -1915,8 +1946,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), v.getStorageID())) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), v.getStorageID(),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
try {
newReplicaInfo = v.createTemporary(b);
LOG.debug("creating temporary for block: {} on volume: {}",
@@ -1973,8 +2005,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block: " + b + " from
Interrupted Thread");
@@ -2010,8 +2043,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- bpid, replicaInfo.getStorageUuid())) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ bpid, replicaInfo.getStorageUuid(),
+ datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
@@ -2060,8 +2094,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
- b.getBlockPoolId(), getStorageUuidForLock(b))) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
+ b.getBlockPoolId(), getStorageUuidForLock(b),
+ datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@@ -2459,7 +2494,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
final String bpid = block.getBlockPoolId();
final Block localBlock = block.getLocalBlock();
final long blockId = localBlock.getBlockId();
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
bpid)) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
volume.getStorageID(),
+ datasetSubLockStrategy.blockIdToSubLock(blockId))) {
final ReplicaInfo info = volumeMap.get(bpid, localBlock);
if (info == null) {
ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId);
@@ -2548,8 +2584,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
bpid + ": ReplicaInfo not found.");
return;
}
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
- info.getStorageUuid())) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+ info.getStorageUuid(),
datasetSubLockStrategy.blockIdToSubLock(blockId))) {
boolean success = false;
try {
info = volumeMap.get(bpid, blockId);
@@ -2746,7 +2782,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
lastDirScannerNotifyTime = startTimeMs;
}
String storageUuid = vol.getStorageID();
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, storageUuid)) {
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
+ vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId)))
{
if (!storageMap.containsKey(storageUuid)) {
// Storage was already removed
return;
@@ -3231,8 +3268,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
- block.getBlockPoolId())) {
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+ block.getBlockPoolId(), getStorageUuidForLock(block),
+ datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -3259,6 +3297,12 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
Set<String> vols = storageMap.keySet();
for (String v : vols) {
lockManager.addLock(LockLevel.VOLUME, bpid, v);
+ List<String> allSubDirNameForDataSetLock =
datasetSubLockStrategy.getAllSubLockName();
+ for (String dir : allSubDirNameForDataSetLock) {
+ lockManager.addLock(LockLevel.DIR, bpid, v, dir);
+ LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
+ bpid, v, dir);
+ }
}
}
try {
@@ -3386,8 +3430,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
- block.getBlockPoolId())) {
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
+ block.getBlockPoolId(), getStorageUuidForLock(block),
+ datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2ab25f8329c..6bfed9a2904 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -6568,6 +6568,15 @@
problem. In produce default set false, because it's have little
performance loss.
</description>
</property>
+
+ <property>
+ <name>dfs.datanode.dataset.sublock.count</name>
+ <value>1000</value>
+ <description>
+ The dataset readwrite lock counts for a volume.
+ </description>
+ </property>
+
<property>
<name>dfs.client.fsck.connect.timeout</name>
<value>60000ms</value>
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
index b514accdf16..6cb12d2681f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java
@@ -37,6 +37,7 @@ public class TestDataSetLockManager {
public void testBaseFunc() {
manager.addLock(LockLevel.BLOCK_POOl, "BPtest");
manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest");
+ manager.addLock(LockLevel.DIR, "BPtest", "Volumetest", "SubDirtest");
AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl,
"BPtest");
AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl,
"BPtest");
@@ -62,6 +63,16 @@ public class TestDataSetLockManager {
manager.lockLeakCheck();
assertNull(manager.getLastException());
+ AutoCloseDataSetLock lock6 = manager.writeLock(LockLevel.BLOCK_POOl,
"BPtest");
+ AutoCloseDataSetLock lock7 = manager.readLock(LockLevel.VOLUME, "BPtest",
"Volumetest");
+ AutoCloseDataSetLock lock8 = manager.readLock(LockLevel.DIR,
+ "BPtest", "Volumetest", "SubDirtest");
+ lock8.close();
+ lock7.close();
+ lock6.close();
+ manager.lockLeakCheck();
+ assertNull(manager.getLastException());
+
manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest");
manager.lockLeakCheck();
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 975874edb1f..f58ee729ef9 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -1946,7 +1946,12 @@ public class TestFsDatasetImpl {
assertFalse(uuids.contains(dn.getDatanodeUuid()));
// This replica has deleted from datanode memory.
- assertNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+ try {
+ Block storedBlock = ds.getStoredBlock(bpid,
extendedBlock.getBlockId());
+ assertNull(storedBlock);
+ } catch (Exception e) {
+ GenericTestUtils.assertExceptionContains("ReplicaNotFoundException",
e);
+ }
} finally {
cluster.shutdown();
DataNodeFaultInjector.set(oldInjector);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index 9d79e496102..2846c16c220 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -50,6 +50,7 @@ import
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
@@ -596,9 +597,13 @@ public class TestDNFencing {
throws IOException {
int count = 0;
for (DataNode dn : cluster.getDataNodes()) {
- if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
- block.getBlockPoolId(), block.getBlockId()) != null) {
- count++;
+ try {
+ if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+ block.getBlockPoolId(), block.getBlockId()) != null) {
+ count++;
+ }
+ } catch (ReplicaNotFoundException e) {
+ continue;
}
}
return count;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]