HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl lock for a very long time (sinago via cmccabe)
(cherry picked from commit 28bebc81db8bb6d1bc2574de7564fe4c595cfe09) (cherry picked from commit a827089905524e10638c783ba908a895d621911d) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3a3092c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3a3092c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3a3092c Branch: refs/heads/sjlee/hdfs-merge Commit: c3a3092c37926eca75ea149c4c061742f6599b40 Parents: c6b68a8 Author: Colin Patrick Mccabe <cmcc...@cloudera.com> Authored: Mon Apr 6 08:54:46 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Thu Aug 13 11:17:20 2015 -0700 ---------------------------------------------------------------------- .../datanode/fsdataset/impl/FsDatasetImpl.java | 67 +++++++++++++------- 1 file changed, 44 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3a3092c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f24d644..e352ea3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1180,30 +1180,51 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(StorageType storageType, - ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); - if (replicaInfo != null) { - if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() - && replicaInfo instanceof ReplicaInPipeline) { - // Stop the previous writer - ((ReplicaInPipeline)replicaInfo) - .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); - invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); - } else { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); + public ReplicaInPipeline createTemporary( + StorageType storageType, ExtendedBlock b) throws IOException { + long startTimeMs = Time.monotonicNow(); + long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); + ReplicaInfo lastFoundReplicaInfo = null; + do { + synchronized (this) { + ReplicaInfo currentReplicaInfo = + volumeMap.get(b.getBlockPoolId(), b.getBlockId()); + if (currentReplicaInfo == lastFoundReplicaInfo) { + if (lastFoundReplicaInfo != null) { + invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); + } + FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); + // create a temporary file to hold block in the designated volume + File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + ReplicaInPipeline newReplicaInfo = + new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, + f.getParentFile(), 0); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return newReplicaInfo; + } else { + if (!(currentReplicaInfo.getGenerationStamp() < b + .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + currentReplicaInfo.getState() + + " and thus cannot be created."); + } + lastFoundReplicaInfo = currentReplicaInfo; + } } - } - - FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); - // create a temporary file to hold block in the designated volume - File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); - ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), 0); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return newReplicaInfo; + + // Hang too long, just bail out. This is not supposed to happen. + long writerStopMs = Time.monotonicNow() - startTimeMs; + if (writerStopMs > writerStopTimeoutMs) { + LOG.warn("Unable to stop existing writer for block " + b + " after " + + writerStopMs + " miniseconds."); + throw new IOException("Unable to stop existing writer for block " + b + + " after " + writerStopMs + " miniseconds."); + } + + // Stop the previous writer + ((ReplicaInPipeline) lastFoundReplicaInfo) + .stopWriter(writerStopTimeoutMs); + } while (true); } /**