Author: cmccabe Date: Thu Feb 13 03:10:48 2014 New Revision: 1567835 URL: http://svn.apache.org/r1567835 Log: HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and DomainSocketWatcher (cmccabe)
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Feb 13 03:10:48 2014 @@ -373,6 +373,9 @@ Release 2.4.0 - UNRELEASED HDFS-5810. Unify mmap cache and short-circuit file descriptor cache (cmccabe) + HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and + DomainSocketWatcher (cmccabe) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Thu Feb 13 03:10:48 2014 @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.Peer; @@ -389,7 +388,7 @@ public class BlockReaderFactory implemen return null; } ShortCircuitCache cache = clientContext.getShortCircuitCache(); - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); if (exc != null) { @@ -492,7 +491,7 @@ public class BlockReaderFactory implemen sock.recvFileInputStreams(fis, buf, 0, buf.length); ShortCircuitReplica replica = null; try { - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); replica = new ShortCircuitReplica(key, fis[0], fis[1], clientContext.getShortCircuitCache(), Time.monotonicNow()); } catch (IOException e) { Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java?rev=1567835&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java Thu Feb 13 03:10:48 2014 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +/** + * An immutable key which identifies a block. + */ +final public class ExtendedBlockId { + /** + * The block ID for this block. + */ + private final long blockId; + + /** + * The block pool ID for this block. + */ + private final String bpId; + + public ExtendedBlockId(long blockId, String bpId) { + this.blockId = blockId; + this.bpId = bpId; + } + + public long getBlockId() { + return this.blockId; + } + + public String getBlockPoolId() { + return this.bpId; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ExtendedBlockId other = (ExtendedBlockId)o; + return new EqualsBuilder(). + append(blockId, other.blockId). + append(bpId, other.bpId). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.blockId). + append(this.bpId). + toHashCode(); + } + + @Override + public String toString() { + return new StringBuilder().append(blockId). + append("_").append(bpId).toString(); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Thu Feb 13 03:10:48 2014 @@ -36,9 +36,9 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -183,8 +183,9 @@ public class ShortCircuitCache implement * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken * exception. */ - private final HashMap<Key, Waitable<ShortCircuitReplicaInfo>> - replicaInfoMap = new HashMap<Key, Waitable<ShortCircuitReplicaInfo>>(); + private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> + replicaInfoMap = new HashMap<ExtendedBlockId, + Waitable<ShortCircuitReplicaInfo>>(); /** * The CacheCleaner. We don't create this and schedule it until it becomes @@ -566,7 +567,7 @@ public class ShortCircuitCache implement * @return Null if no replica could be found or created. * The replica, otherwise. */ - public ShortCircuitReplicaInfo fetchOrCreate(Key key, + public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, ShortCircuitReplicaCreator creator) { Waitable<ShortCircuitReplicaInfo> newWaitable = null; lock.lock(); @@ -612,7 +613,7 @@ public class ShortCircuitCache implement * * @throws RetriableException If the caller needs to retry. */ - private ShortCircuitReplicaInfo fetch(Key key, + private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { // Another thread is already in the process of loading this // ShortCircuitReplica. So we simply wait for it to complete. @@ -656,7 +657,7 @@ public class ShortCircuitCache implement return info; } - private ShortCircuitReplicaInfo create(Key key, + private ShortCircuitReplicaInfo create(ExtendedBlockId key, ShortCircuitReplicaCreator creator, Waitable<ShortCircuitReplicaInfo> newWaitable) { // Handle loading a new replica. @@ -805,8 +806,8 @@ public class ShortCircuitCache implement @VisibleForTesting // ONLY for testing public interface CacheVisitor { void visit(int numOutstandingMmaps, - Map<Key, ShortCircuitReplica> replicas, - Map<Key, InvalidToken> failedLoads, + Map<ExtendedBlockId, ShortCircuitReplica> replicas, + Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped); } @@ -815,11 +816,11 @@ public class ShortCircuitCache implement public void accept(CacheVisitor visitor) { lock.lock(); try { - Map<Key, ShortCircuitReplica> replicas = - new HashMap<Key, ShortCircuitReplica>(); - Map<Key, InvalidToken> failedLoads = - new HashMap<Key, InvalidToken>(); - for (Entry<Key, Waitable<ShortCircuitReplicaInfo>> entry : + Map<ExtendedBlockId, ShortCircuitReplica> replicas = + new HashMap<ExtendedBlockId, ShortCircuitReplica>(); + Map<ExtendedBlockId, InvalidToken> failedLoads = + new HashMap<ExtendedBlockId, InvalidToken>(); + for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry : replicaInfoMap.entrySet()) { Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue(); if (waitable.hasVal()) { @@ -839,13 +840,13 @@ public class ShortCircuitCache implement append("with outstandingMmapCount=").append(outstandingMmapCount). append(", replicas="); String prefix = ""; - for (Entry<Key, ShortCircuitReplica> entry : replicas.entrySet()) { + for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) { builder.append(prefix).append(entry.getValue()); prefix = ","; } prefix = ""; builder.append(", failedLoads="); - for (Entry<Key, InvalidToken> entry : failedLoads.entrySet()) { + for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) { builder.append(prefix).append(entry.getValue()); prefix = ","; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java Thu Feb 13 03:10:48 2014 @@ -25,10 +25,9 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; @@ -50,64 +49,9 @@ public class ShortCircuitReplica { public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); /** - * Immutable class which identifies a ShortCircuitReplica object. - */ - public static final class Key { - public Key(long blockId, String bpId) { - this.blockId = blockId; - this.bpId = bpId; - } - - public long getBlockId() { - return this.blockId; - } - - public String getBlockPoolId() { - return this.bpId; - } - - @Override - public boolean equals(Object o) { - if ((o == null) || (o.getClass() != this.getClass())) { - return false; - } - Key other = (Key)o; - return new EqualsBuilder(). - append(blockId, other.blockId). - append(bpId, other.bpId). - isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(this.blockId). - append(this.bpId). - toHashCode(); - } - - @Override - public String toString() { - return new StringBuilder().append(blockId). - append("_").append(bpId).toString(); - } - - /** - * The block ID for this BlockDescriptors object. - */ - private final long blockId; - - /** - * The block pool ID for this BlockDescriptors object. - */ - private final String bpId; - } - - - /** * Identifies this ShortCircuitReplica object. */ - final Key key; + final ExtendedBlockId key; /** * The block data input stream. @@ -168,7 +112,7 @@ public class ShortCircuitReplica { */ private Long evictableTimeNs = null; - public ShortCircuitReplica(Key key, + public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, ShortCircuitCache cache, long creationTimeMs) throws IOException { this.key = key; @@ -262,7 +206,7 @@ public class ShortCircuitReplica { return metaHeader; } - public Key getKey() { + public ExtendedBlockId getKey() { return key; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Thu Feb 13 03:10:48 2014 @@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -57,43 +57,6 @@ import org.apache.hadoop.io.nativeio.Nat @InterfaceStability.Unstable public class FsDatasetCache { /** - * Keys which identify MappableBlocks. - */ - private static final class Key { - /** - * Block id. - */ - final long id; - - /** - * Block pool id. - */ - final String bpid; - - Key(long id, String bpid) { - this.id = id; - this.bpid = bpid; - } - - @Override - public boolean equals(Object o) { - if (o == null) { - return false; - } - if (!(o.getClass() == getClass())) { - return false; - } - Key other = (Key)o; - return ((other.id == this.id) && (other.bpid.equals(this.bpid))); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(id).append(bpid).hashCode(); - } - }; - - /** * MappableBlocks that we know about. */ private static final class Value { @@ -143,7 +106,8 @@ public class FsDatasetCache { /** * Stores MappableBlock objects and the states they're in. */ - private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>(); + private final HashMap<ExtendedBlockId, Value> mappableBlockMap = + new HashMap<ExtendedBlockId, Value>(); private final AtomicLong numBlocksCached = new AtomicLong(0); @@ -260,12 +224,12 @@ public class FsDatasetCache { */ synchronized List<Long> getCachedBlocks(String bpid) { List<Long> blocks = new ArrayList<Long>(); - for (Iterator<Entry<Key, Value>> iter = + for (Iterator<Entry<ExtendedBlockId, Value>> iter = mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) { - Entry<Key, Value> entry = iter.next(); - if (entry.getKey().bpid.equals(bpid)) { + Entry<ExtendedBlockId, Value> entry = iter.next(); + if (entry.getKey().getBlockPoolId().equals(bpid)) { if (entry.getValue().state.shouldAdvertise()) { - blocks.add(entry.getKey().id); + blocks.add(entry.getKey().getBlockId()); } } } @@ -278,7 +242,7 @@ public class FsDatasetCache { synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor) { - Key key = new Key(blockId, bpid); + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue != null) { if (LOG.isDebugEnabled()) { @@ -299,7 +263,7 @@ public class FsDatasetCache { } synchronized void uncacheBlock(String bpid, long blockId) { - Key key = new Key(blockId, bpid); + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); if (prevValue == null) { @@ -344,12 +308,12 @@ public class FsDatasetCache { * Background worker that mmaps, mlocks, and checksums a block */ private class CachingTask implements Runnable { - private final Key key; + private final ExtendedBlockId key; private final String blockFileName; private final long length; private final long genstamp; - CachingTask(Key key, String blockFileName, long length, long genstamp) { + CachingTask(ExtendedBlockId key, String blockFileName, long length, long genstamp) { this.key = key; this.blockFileName = blockFileName; this.length = length; @@ -361,13 +325,13 @@ public class FsDatasetCache { boolean success = false; FileInputStream blockIn = null, metaIn = null; MappableBlock mappableBlock = null; - ExtendedBlock extBlk = - new ExtendedBlock(key.bpid, key.id, length, genstamp); + ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), + key.getBlockId(), length, genstamp); long newUsedBytes = usedBytesCount.reserve(length); if (newUsedBytes < 0) { - LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + - ": could not reserve " + length + " more bytes in the " + - "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + LOG.warn("Failed to cache " + key + ": could not reserve " + length + + " more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " of " + maxBytes + " exceeded."); numBlocksFailedToCache.incrementAndGet(); return; @@ -378,16 +342,15 @@ public class FsDatasetCache { metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) .getWrappedStream(); } catch (ClassCastException e) { - LOG.warn("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": Underlying blocks are not backed by files.", e); + LOG.warn("Failed to cache " + key + + ": Underlying blocks are not backed by files.", e); return; } catch (FileNotFoundException e) { - LOG.info("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": failed to find backing files."); + LOG.info("Failed to cache " + key + ": failed to find backing " + + "files."); return; } catch (IOException e) { - LOG.warn("Failed to cache block with id " + key.id + ", pool " + - key.bpid + ": failed to open file", e); + LOG.warn("Failed to cache " + key + ": failed to open file", e); return; } try { @@ -395,11 +358,10 @@ public class FsDatasetCache { load(length, blockIn, metaIn, blockFileName); } catch (ChecksumException e) { // Exception message is bogus since this wasn't caused by a file read - LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " + - "checksum verification failed."); + LOG.warn("Failed to cache " + key + ": checksum verification failed."); return; } catch (IOException e) { - LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e); + LOG.warn("Failed to cache " + key, e); return; } synchronized (FsDatasetCache.this) { @@ -409,15 +371,14 @@ public class FsDatasetCache { value.state == State.CACHING_CANCELLED); if (value.state == State.CACHING_CANCELLED) { mappableBlockMap.remove(key); - LOG.warn("Caching of block " + key.id + " in " + key.bpid + - " was cancelled."); + LOG.warn("Caching of " + key + " was cancelled."); return; } mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); } if (LOG.isDebugEnabled()) { - LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + - ". We are now caching " + newUsedBytes + " bytes in total."); + LOG.debug("Successfully cached " + key + ". We are now caching " + + newUsedBytes + " bytes in total."); } numBlocksCached.addAndGet(1); success = true; @@ -425,9 +386,8 @@ public class FsDatasetCache { if (!success) { newUsedBytes = usedBytesCount.release(length); if (LOG.isDebugEnabled()) { - LOG.debug("Caching of block " + key.id + " in " + - key.bpid + " was aborted. We are now caching only " + - newUsedBytes + " + bytes in total."); + LOG.debug("Caching of " + key + " was aborted. We are now " + + "caching only " + newUsedBytes + " + bytes in total."); } IOUtils.closeQuietly(blockIn); IOUtils.closeQuietly(metaIn); @@ -445,9 +405,9 @@ public class FsDatasetCache { } private class UncachingTask implements Runnable { - private final Key key; + private final ExtendedBlockId key; - UncachingTask(Key key) { + UncachingTask(ExtendedBlockId key) { this.key = key; } @@ -470,8 +430,8 @@ public class FsDatasetCache { usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); if (LOG.isDebugEnabled()) { - LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + - " completed. usedBytes = " + newUsedBytes); + LOG.debug("Uncaching of " + key + " completed. " + + "usedBytes = " + newUsedBytes); } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Thu Feb 13 03:10:48 2014 @@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.client.Hdf import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; @@ -275,8 +274,8 @@ public class TestEnhancedByteBufferAcces @Override public void visit(int numOutstandingMmaps, - Map<Key, ShortCircuitReplica> replicas, - Map<Key, InvalidToken> failedLoads, + Map<ExtendedBlockId, ShortCircuitReplica> replicas, + Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { if (expectedNumOutstandingMmaps >= 0) { @@ -341,12 +340,12 @@ public class TestEnhancedByteBufferAcces cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, - Map<Key, ShortCircuitReplica> replicas, - Map<Key, InvalidToken> failedLoads, + Map<ExtendedBlockId, ShortCircuitReplica> replicas, + Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { ShortCircuitReplica replica = replicas.get( - new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); + new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); Assert.assertNotNull(replica); Assert.assertTrue(replica.hasMmap()); // The replica should not yet be evictable, since we have it open. @@ -378,8 +377,8 @@ public class TestEnhancedByteBufferAcces cache.accept(new CacheVisitor() { @Override public void visit(int numOutstandingMmaps, - Map<Key, ShortCircuitReplica> replicas, - Map<Key, InvalidToken> failedLoads, + Map<ExtendedBlockId, ShortCircuitReplica> replicas, + Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped) { finished.setValue(evictableMmapped.isEmpty()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Thu Feb 13 03:10:48 2014 @@ -32,14 +32,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.junit.AfterClass; import org.junit.Assert; @@ -170,7 +168,7 @@ public class TestBlockReaderLocal { }; dataIn = streams[0]; metaIn = streams[1]; - Key key = new Key(block.getBlockId(), block.getBlockPoolId()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplica replica = new ShortCircuitReplica( key, dataIn, metaIn, shortCircuitCache, Time.now()); blockReaderLocal = new BlockReaderLocal.Builder( Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java?rev=1567835&r1=1567834&r2=1567835&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Thu Feb 13 03:10:48 2014 @@ -20,17 +20,13 @@ package org.apache.hadoop.hdfs; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ShortCircuitCache; -import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; @@ -44,7 +40,6 @@ import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Map; public class TestShortCircuitCache { static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); @@ -105,7 +100,7 @@ public class TestShortCircuitCache { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { try { - Key key = new Key(blockId, "test_bp1"); + ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1"); return new ShortCircuitReplicaInfo( new ShortCircuitReplica(key, pair.getFileInputStreams()[0], pair.getFileInputStreams()[1], @@ -129,14 +124,14 @@ public class TestShortCircuitCache { new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = - cache.fetchOrCreate(new Key(123, "test_bp1"), + cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); pair.compareWith(replicaInfo1.getReplica().getDataStream(), replicaInfo1.getReplica().getMetaStream()); ShortCircuitReplicaInfo replicaInfo2 = - cache.fetchOrCreate(new Key(123, "test_bp1"), + cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -157,7 +152,7 @@ public class TestShortCircuitCache { // really long here) ShortCircuitReplicaInfo replicaInfo3 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Assert.fail("expected to use existing entry."); @@ -179,7 +174,7 @@ public class TestShortCircuitCache { final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); + new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); pair.compareWith(replicaInfo1.getReplica().getDataStream(), @@ -190,7 +185,7 @@ public class TestShortCircuitCache { Thread.sleep(10); ShortCircuitReplicaInfo replicaInfo2 = cache.fetchOrCreate( - new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { triedToCreate.setValue(true); @@ -221,7 +216,7 @@ public class TestShortCircuitCache { }; for (int i = 0; i < pairs.length; i++) { replicaInfos[i] = cache.fetchOrCreate( - new Key(i, "test_bp1"), + new ExtendedBlockId(i, "test_bp1"), new SimpleReplicaCreator(i, cache, pairs[i])); Preconditions.checkNotNull(replicaInfos[i].getReplica()); Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null); @@ -237,7 +232,7 @@ public class TestShortCircuitCache { for (int i = 1; i < pairs.length; i++) { final Integer iVal = new Integer(i); replicaInfos[i] = cache.fetchOrCreate( - new Key(i, "test_bp1"), + new ExtendedBlockId(i, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -253,7 +248,7 @@ public class TestShortCircuitCache { // The first (oldest) replica should not be cached. final MutableBoolean calledCreate = new MutableBoolean(false); replicaInfos[0] = cache.fetchOrCreate( - new Key(0, "test_bp1"), + new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { @@ -289,7 +284,7 @@ public class TestShortCircuitCache { final long HOUR_IN_MS = 60 * 60 * 1000; for (int i = 0; i < pairs.length; i++) { final Integer iVal = new Integer(i); - final Key key = new Key(i, "test_bp1"); + final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1"); replicaInfos[i] = cache.fetchOrCreate(key, new ShortCircuitReplicaCreator() { @Override @@ -316,7 +311,7 @@ public class TestShortCircuitCache { @Override public Boolean get() { ShortCircuitReplicaInfo info = cache.fetchOrCreate( - new Key(0, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { return null; @@ -332,7 +327,7 @@ public class TestShortCircuitCache { // Make sure that second replica did not go stale. ShortCircuitReplicaInfo info = cache.fetchOrCreate( - new Key(1, "test_bp1"), new ShortCircuitReplicaCreator() { + new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() { @Override public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { Assert.fail("second replica went stale, despite 1 " +