This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new bd13d73 HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527) bd13d73 is described below commit bd13d73334c8468c5345f80487e8a7f671707428 Author: Bryan Beaudreault <bbeaudrea...@hubspot.com> AuthorDate: Tue Jan 25 06:42:35 2022 -0500 HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527) (cherry picked from commit 94b884ae553e9d67f57e3ae7d2b22468c6f342fa) --- .../java/org/apache/hadoop/hdfs/ClientContext.java | 42 +++ .../java/org/apache/hadoop/hdfs/DFSClient.java | 34 ++- .../org/apache/hadoop/hdfs/DFSInputStream.java | 324 ++++++++++++++------- .../apache/hadoop/hdfs/DFSStripedInputStream.java | 10 +- .../apache/hadoop/hdfs/LocatedBlocksRefresher.java | 210 +++++++++++++ .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 13 + .../hadoop/hdfs/client/impl/DfsClientConf.java | 18 +- .../src/main/resources/hdfs-default.xml | 19 ++ .../hdfs/TestDFSInputStreamBlockLocations.java | 244 +++++++--------- .../hadoop/hdfs/TestLocatedBlocksRefresher.java | 266 +++++++++++++++++ 10 files changed, 915 insertions(+), 265 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 5bb7e03..f83346e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -70,6 +70,11 @@ public class ClientContext { private final String name; /** + * The client conf used to initialize context. + */ + private final DfsClientConf dfsClientConf; + + /** * String representation of the configuration. */ private final String confString; @@ -131,6 +136,17 @@ public class ClientContext { private volatile DeadNodeDetector deadNodeDetector = null; /** + * The switch for the {@link LocatedBlocksRefresher}. + */ + private final boolean locatedBlocksRefresherEnabled; + + /** + * Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing + * registered {@link DFSInputStream}s, to take advantage of changes in block placement. + */ + private volatile LocatedBlocksRefresher locatedBlocksRefresher = null; + + /** * Count the reference of ClientContext. */ private int counter = 0; @@ -146,6 +162,7 @@ public class ClientContext { final ShortCircuitConf scConf = conf.getShortCircuitConf(); this.name = name; + this.dfsClientConf = conf; this.confString = scConf.confAsString(); this.clientShortCircuitNum = conf.getClientShortCircuitNum(); this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; @@ -164,6 +181,7 @@ public class ClientContext { this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); + this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled(); initTopologyResolution(config); } @@ -302,6 +320,21 @@ public class ClientContext { } /** + * If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks + * of registered DFSInputStreams. + */ + public boolean isLocatedBlocksRefresherEnabled() { + return locatedBlocksRefresherEnabled; + } + + /** + * Obtain LocatedBlocksRefresher of the current client. + */ + public LocatedBlocksRefresher getLocatedBlocksRefresher() { + return locatedBlocksRefresher; + } + + /** * Increment the counter. Start the dead node detector thread if there is no * reference. */ @@ -311,6 +344,10 @@ public class ClientContext { deadNodeDetector = new DeadNodeDetector(name, configuration); deadNodeDetector.start(); } + if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) { + locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf); + locatedBlocksRefresher.start(); + } } /** @@ -324,5 +361,10 @@ public class ClientContext { deadNodeDetector.shutdown(); deadNodeDetector = null; } + + if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) { + locatedBlocksRefresher.shutdown(); + locatedBlocksRefresher = null; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 7500273..ea9736d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -870,7 +870,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } public long getRefreshReadBlkLocationsInterval() { - return dfsClientConf.getRefreshReadBlockLocationsMS(); + return dfsClientConf.getLocatedBlocksRefresherInterval(); } public LocatedBlocks getLocatedBlocks(String src, long start) @@ -3402,4 +3402,36 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DeadNodeDetector getDeadNodeDetector() { return clientContext.getDeadNodeDetector(); } + + /** + * Obtain LocatedBlocksRefresher of the current client. + */ + public LocatedBlocksRefresher getLocatedBlockRefresher() { + return clientContext.getLocatedBlocksRefresher(); + } + + /** + * Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that + * the underlying {@link LocatedBlocks} is periodically refreshed. + */ + public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) { + if (isLocatedBlocksRefresherEnabled()) { + clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream); + } + } + + /** + * Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that + * the underlying {@link LocatedBlocks} is no longer periodically refreshed. + * @param dfsInputStream + */ + public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) { + if (isLocatedBlocksRefresherEnabled()) { + clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream); + } + } + + private boolean isLocatedBlocksRefresherEnabled() { + return clientContext.isLocatedBlocksRefresherEnabled(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 297bbfa..485b6b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -128,18 +130,18 @@ public class DFSInputStream extends FSInputStream private long lastBlockBeingWrittenLength = 0; private FileEncryptionInfo fileEncryptionInfo = null; protected CachingStrategy cachingStrategy; + // this is volatile because it will be polled outside the lock, + // but still only updated within the lock + private volatile long lastRefreshedBlocksAt = Time.monotonicNow(); //// + private AtomicBoolean refreshingBlockLocations = new AtomicBoolean(false); protected final ReadStatistics readStatistics = new ReadStatistics(); // lock for state shared between read and pread // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks // (it's OK to acquire this lock when the lock on <this> is held) protected final Object infoLock = new Object(); - // refresh locatedBlocks periodically - private final long refreshReadBlockIntervals; - /** timeStamp of the last time a block location was refreshed. */ - private long locatedBlocksTimeStamp; /** * Track the ByteBuffers that we have handed out to readers. * @@ -156,10 +158,6 @@ public class DFSInputStream extends FSInputStream return extendedReadBuffers; } - private boolean isPeriodicRefreshEnabled() { - return (refreshReadBlockIntervals > 0L); - } - /** * This variable tracks the number of failures since the start of the * most recent user-facing operation. That is to say, it should be reset @@ -206,9 +204,6 @@ public class DFSInputStream extends FSInputStream DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; - this.refreshReadBlockIntervals = - this.dfsClient.getRefreshReadBlkLocationsInterval(); - setLocatedBlocksTimeStamp(); this.verifyChecksum = verifyChecksum; this.src = src; synchronized (infoLock) { @@ -228,19 +223,6 @@ public class DFSInputStream extends FSInputStream return deadNodes.containsKey(nodeInfo); } - @VisibleForTesting - void setReadTimeStampsForTesting(long timeStamp) { - setLocatedBlocksTimeStamp(timeStamp); - } - - private void setLocatedBlocksTimeStamp() { - setLocatedBlocksTimeStamp(Time.monotonicNow()); - } - - private void setLocatedBlocksTimeStamp(long timeStamp) { - this.locatedBlocksTimeStamp = timeStamp; - } - /** * Grab the open-file info from namenode * @param refreshLocatedBlocks whether to re-fetch locatedblocks @@ -248,33 +230,50 @@ public class DFSInputStream extends FSInputStream void openInfo(boolean refreshLocatedBlocks) throws IOException { final DfsClientConf conf = dfsClient.getConf(); synchronized(infoLock) { - lastBlockBeingWrittenLength = - fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); - while (retriesForLastBlockLength > 0) { + + while (true) { + LocatedBlocks newLocatedBlocks; + if (locatedBlocks == null || refreshLocatedBlocks) { + newLocatedBlocks = fetchAndCheckLocatedBlocks(locatedBlocks); + } else { + newLocatedBlocks = locatedBlocks; + } + + long lastBlockLength = getLastBlockLength(newLocatedBlocks); + if (lastBlockLength != -1) { + setLocatedBlocksFields(newLocatedBlocks, lastBlockLength); + return; + } + // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. Lets // retry for 3 times to get the length. - if (lastBlockBeingWrittenLength == -1) { - DFSClient.LOG.warn("Last block locations not available. " - + "Datanodes might not have reported blocks completely." - + " Will retry for " + retriesForLastBlockLength + " times"); - waitFor(conf.getRetryIntervalForGetLastBlockLength()); - lastBlockBeingWrittenLength = - fetchLocatedBlocksAndGetLastBlockLength(true); - } else { - break; + + if (retriesForLastBlockLength-- <= 0) { + throw new IOException("Could not obtain the last block locations."); } - retriesForLastBlockLength--; - } - if (lastBlockBeingWrittenLength == -1 - && retriesForLastBlockLength == 0) { - throw new IOException("Could not obtain the last block locations."); + + DFSClient.LOG.warn("Last block locations not available. " + + "Datanodes might not have reported blocks completely." + + " Will retry for " + retriesForLastBlockLength + " times"); + waitFor(conf.getRetryIntervalForGetLastBlockLength()); } } } + /** + * Set locatedBlocks and related fields, using the passed lastBlockLength. + * Should be called within infoLock. + */ + private void setLocatedBlocksFields(LocatedBlocks locatedBlocksToSet, long lastBlockLength) { + locatedBlocks = locatedBlocksToSet; + lastBlockBeingWrittenLength = lastBlockLength; + fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); + setLastRefreshedBlocksAt(); + } + private void waitFor(int waitTime) throws IOException { try { Thread.sleep(waitTime); @@ -285,62 +284,18 @@ public class DFSInputStream extends FSInputStream } } - /** - * Checks whether the block locations timestamps have expired. - * In the case of expired timestamp: - * - clear list of deadNodes - * - call openInfo(true) which will re-fetch locatedblocks - * - update locatedBlocksTimeStamp - * @return true when the expiration feature is enabled and locatedblocks - * timestamp has expired. - * @throws IOException - */ - private boolean isLocatedBlocksExpired() { - if (!isPeriodicRefreshEnabled()) { - return false; - } - long now = Time.monotonicNow(); - long elapsed = now - locatedBlocksTimeStamp; - if (elapsed < refreshReadBlockIntervals) { - return false; - } - return true; - } - - /** - * Update the block locations timestamps if they have expired. - * In the case of expired timestamp: - * - clear list of deadNodes - * - call openInfo(true) which will re-fetch locatedblocks - * - update locatedBlocksTimeStamp - * @return true when the locatedblocks list is re-fetched from the namenode. - * @throws IOException - */ - private boolean updateBlockLocationsStamp() throws IOException { - if (!isLocatedBlocksExpired()) { - return false; - } - // clear dead nodes - deadNodes.clear(); - openInfo(true); - setLocatedBlocksTimeStamp(); - return true; - } - - private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) + private LocatedBlocks fetchAndCheckLocatedBlocks(LocatedBlocks existing) throws IOException { - LocatedBlocks newInfo = locatedBlocks; - if (locatedBlocks == null || refresh) { - newInfo = dfsClient.getLocatedBlocks(src, 0); - } + LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); + DFSClient.LOG.debug("newInfo = {}", newInfo); if (newInfo == null) { throw new IOException("Cannot open filename " + src); } - if (locatedBlocks != null) { + if (existing != null) { Iterator<LocatedBlock> oldIter = - locatedBlocks.getLocatedBlocks().iterator(); + existing.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { @@ -348,17 +303,14 @@ public class DFSInputStream extends FSInputStream } } } - locatedBlocks = newInfo; - long lastBlkBeingWrittenLength = getLastBlockLength(); - fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); - return lastBlkBeingWrittenLength; + return newInfo; } - private long getLastBlockLength() throws IOException{ + private long getLastBlockLength(LocatedBlocks blocks) throws IOException{ long lastBlockBeingWrittenLength = 0; - if (!locatedBlocks.isLastBlockComplete()) { - final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); + if (!blocks.isLastBlockComplete()) { + final LocatedBlock last = blocks.getLastLocatedBlock(); if (last != null) { if (last.getLocations().length == 0) { if (last.getBlockSize() == 0) { @@ -501,6 +453,14 @@ public class DFSInputStream extends FSInputStream return getBlockRange(0, getFileLength()); } + protected String getSrc() { + return src; + } + + protected LocatedBlocks getLocatedBlocks() { + return locatedBlocks; + } + /** * Get block at the specified position. * Fetch it from the namenode if not cached. @@ -543,8 +503,8 @@ public class DFSInputStream extends FSInputStream /** Fetch a block from namenode and cache it */ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) throws IOException { + maybeRegisterBlockRefresh(); synchronized(infoLock) { - updateBlockLocationsStamp(); int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); @@ -559,8 +519,7 @@ public class DFSInputStream extends FSInputStream } // Update the LastLocatedBlock, if offset is for last block. if (offset >= locatedBlocks.getFileLength()) { - locatedBlocks = newBlocks; - lastBlockBeingWrittenLength = getLastBlockLength(); + setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks)); } else { locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); @@ -587,6 +546,7 @@ public class DFSInputStream extends FSInputStream throw new IOException("Offset: " + offset + " exceeds file length: " + getFileLength()); } + synchronized(infoLock) { final List<LocatedBlock> blocks; final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); @@ -644,6 +604,9 @@ public class DFSInputStream extends FSInputStream if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } + + maybeRegisterBlockRefresh(); + // Will be getting a new BlockReader. closeCurrentBlockReaders(); @@ -657,9 +620,6 @@ public class DFSInputStream extends FSInputStream boolean connectFailedOnce = false; while (true) { - // Re-fetch the locatedBlocks from NN if the timestamp has expired. - updateBlockLocationsStamp(); - // // Compute desired block // @@ -793,6 +753,7 @@ public class DFSInputStream extends FSInputStream * this dfsInputStream anymore. */ dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks); + maybeDeRegisterBlockRefresh(); } } @@ -871,16 +832,16 @@ public class DFSInputStream extends FSInputStream int len = strategy.getTargetLength(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); failures = 0; + + maybeRegisterBlockRefresh(); + if (pos < getFileLength()) { int retries = 2; while (retries > 0) { try { // currentNode can be left as null if previous read had a checksum // error on the same block. See HDFS-3067 - // currentNode needs to be updated if the blockLocations timestamp has - // expired. - if (pos > blockEnd || currentNode == null - || updateBlockLocationsStamp()) { + if (pos > blockEnd || currentNode == null) { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); @@ -1958,4 +1919,153 @@ public class DFSInputStream extends FSInputStream return false; } } + + /** + * Many DFSInputStreams can be opened and closed in quick succession, in which case + * they would be registered/deregistered but never need to be refreshed. + * Defers registering with the located block refresher, in order to avoid an additional + * source of unnecessary synchronization for short-lived DFSInputStreams. + */ + protected void maybeRegisterBlockRefresh() { + if (!dfsClient.getConf().isRefreshReadBlockLocationsAutomatically() + || !dfsClient.getConf().isLocatedBlocksRefresherEnabled()) { + return; + } + + if (refreshingBlockLocations.get()) { + return; + } + + // not enough time elapsed to refresh + long timeSinceLastRefresh = Time.monotonicNow() - lastRefreshedBlocksAt; + if (timeSinceLastRefresh < dfsClient.getConf().getLocatedBlocksRefresherInterval()) { + return; + } + + if (!refreshingBlockLocations.getAndSet(true)) { + dfsClient.addLocatedBlocksRefresh(this); + } + } + + /** + * De-register periodic refresh of this inputstream, if it was added to begin with. + */ + private void maybeDeRegisterBlockRefresh() { + if (refreshingBlockLocations.get()) { + dfsClient.removeLocatedBlocksRefresh(this); + } + } + + /** + * Refresh blocks for the input stream, if necessary. + * + * @param addressCache optional map to use as a cache for resolving datanode InetSocketAddress + * @return whether a refresh was performed or not + */ + boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) { + LocatedBlocks blocks; + synchronized (infoLock) { + blocks = getLocatedBlocks(); + } + + if (getLocalDeadNodes().isEmpty() && allBlocksLocal(blocks, addressCache)) { + return false; + } + + try { + DFSClient.LOG.debug("Refreshing {} for path {}", this, getSrc()); + LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(blocks); + long lastBlockLength = getLastBlockLength(newLocatedBlocks); + if (lastBlockLength == -1) { + DFSClient.LOG.debug( + "Discarding refreshed blocks for path {} because lastBlockLength was -1", + getSrc()); + return true; + } + + setRefreshedValues(newLocatedBlocks, lastBlockLength); + } catch (IOException e) { + DFSClient.LOG.debug("Failed to refresh DFSInputStream for path {}", getSrc(), e); + } + + return true; + } + + /** + * Once new LocatedBlocks have been fetched, sets them on the DFSInputStream and + * updates stateful read location within the necessary locks. + */ + private synchronized void setRefreshedValues(LocatedBlocks blocks, long lastBlockLength) + throws IOException { + synchronized (infoLock) { + setLocatedBlocksFields(blocks, lastBlockLength); + } + + getLocalDeadNodes().clear(); + + // if a stateful read has been initialized, refresh it + if (currentNode != null) { + currentNode = blockSeekTo(pos); + } + } + + private boolean allBlocksLocal(LocatedBlocks blocks, + Map<String, InetSocketAddress> addressCache) { + if (addressCache == null) { + addressCache = new HashMap<>(); + } + + // we only need to check the first location of each block, because the blocks are already + // sorted by distance from the current host + for (LocatedBlock lb : blocks.getLocatedBlocks()) { + if (lb.getLocations().length == 0) { + return false; + } + + DatanodeInfoWithStorage location = lb.getLocations()[0]; + if (location == null) { + return false; + } + + InetSocketAddress targetAddr = addressCache.computeIfAbsent( + location.getDatanodeUuid(), + unused -> { + String dnAddr = location.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); + return NetUtils.createSocketAddr( + dnAddr, + -1, + null, + dfsClient.getConf().isUriCacheEnabled()); + }); + + if (!isResolveableAndLocal(targetAddr)) { + return false; + } + } + + return true; + } + + private boolean isResolveableAndLocal(InetSocketAddress targetAddr) { + try { + return DFSUtilClient.isLocalAddress(targetAddr); + } catch (IOException e) { + DFSClient.LOG.debug("Got an error checking if {} is local", targetAddr, e); + return false; + } + } + + @VisibleForTesting + void setLastRefreshedBlocksAtForTesting(long timestamp) { + lastRefreshedBlocksAt = timestamp; + } + + @VisibleForTesting + long getLastRefreshedBlocksAtForTesting() { + return lastRefreshedBlocksAt; + } + + private void setLastRefreshedBlocksAt() { + lastRefreshedBlocksAt = Time.monotonicNow(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 6158521..ec4ad35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -143,14 +143,6 @@ public class DFSStripedInputStream extends DFSInputStream { return curStripeBuf; } - protected String getSrc() { - return src; - } - - protected LocatedBlocks getLocatedBlocks() { - return locatedBlocks; - } - protected ByteBufferPool getBufferPool() { return BUFFER_POOL; } @@ -168,6 +160,8 @@ public class DFSStripedInputStream extends DFSInputStream { throw new IOException("Attempted to read past end of file"); } + maybeRegisterBlockRefresh(); + // Will be getting a new BlockReader. closeCurrentBlockReaders(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java new file mode 100644 index 0000000..454d1f9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Periodically refresh the underlying cached {@link LocatedBlocks} for eligible registered + * {@link DFSInputStream}s. DFSInputStreams are eligible for refreshing if they have any + * deadNodes or any blocks are lacking local replicas. + * Disabled by default, unless an interval is configured. + */ +public class LocatedBlocksRefresher extends Daemon { + private static final Logger LOG = + LoggerFactory.getLogger(LocatedBlocksRefresher.class); + + private static final String THREAD_PREFIX = "located-block-refresher-"; + + private final String name; + private final long interval; + private final long jitter; + private final ExecutorService refreshThreadPool; + + // Use WeakHashMap so that we don't hold onto references that might have not been explicitly + // closed because they were created and thrown away. + private final Set<DFSInputStream> registeredInputStreams = + Collections.newSetFromMap(new WeakHashMap<>()); + + private int runCount; + private int refreshCount; + + LocatedBlocksRefresher(String name, Configuration conf, DfsClientConf dfsClientConf) { + this.name = name; + this.interval = dfsClientConf.getLocatedBlocksRefresherInterval(); + this.jitter = Math.round(this.interval * 0.1); + int rpcThreads = conf.getInt(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY, + DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT); + + String threadPrefix; + if (name.equals(DFS_CLIENT_CONTEXT_DEFAULT)) { + threadPrefix = THREAD_PREFIX; + } else { + threadPrefix = THREAD_PREFIX + name + "-"; + } + + this.refreshThreadPool = Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName(threadPrefix + threadIndex.getAndIncrement()); + return t; + } + }); + + setName(threadPrefix + "main"); + + LOG.info("Start located block refresher for DFSClient {}.", this.name); + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + + if (!waitForInterval()) { + return; + } + + LOG.debug("Running refresh for {} streams", registeredInputStreams.size()); + long start = Time.monotonicNow(); + AtomicInteger neededRefresh = new AtomicInteger(0); + + Phaser phaser = new Phaser(1); + + Map<String, InetSocketAddress> addressCache = new ConcurrentHashMap<>(); + + for (DFSInputStream inputStream : getInputStreams()) { + phaser.register(); + refreshThreadPool.submit(() -> { + try { + if (isInputStreamTracked(inputStream) && + inputStream.refreshBlockLocations(addressCache)) { + neededRefresh.incrementAndGet(); + } + } finally { + phaser.arriveAndDeregister(); + } + }); + } + + phaser.arriveAndAwaitAdvance(); + + synchronized (this) { + runCount++; + refreshCount += neededRefresh.get(); + } + + LOG.debug( + "Finished refreshing {} of {} streams in {}ms", + neededRefresh, + registeredInputStreams.size(), + Time.monotonicNow() - start + ); + } + } + + public synchronized int getRunCount() { + return runCount; + } + + public synchronized int getRefreshCount() { + return refreshCount; + } + + private boolean waitForInterval() { + try { + Thread.sleep(interval + ThreadLocalRandom.current().nextLong(-jitter, jitter)); + return true; + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait interval", e); + Thread.currentThread().interrupt(); + return false; + } + } + + /** + * Shutdown all the threads. + */ + public void shutdown() { + if (isAlive()) { + interrupt(); + try { + join(); + } catch (InterruptedException e) { + } + } + refreshThreadPool.shutdown(); + } + + /** + * Collects the DFSInputStreams to a list within synchronization, so that we can iterate them + * without potentially blocking callers to {@link #addInputStream(DFSInputStream)} or + * {@link #removeInputStream(DFSInputStream)}. We don't care so much about missing additions, + * and we'll guard against removals by doing an additional + * {@link #isInputStreamTracked(DFSInputStream)} track during iteration. + */ + private synchronized Collection<DFSInputStream> getInputStreams() { + return new ArrayList<>(registeredInputStreams); + } + + public synchronized void addInputStream(DFSInputStream dfsInputStream) { + LOG.trace("Registering {} for {}", dfsInputStream, dfsInputStream.getSrc()); + registeredInputStreams.add(dfsInputStream); + } + + public synchronized void removeInputStream(DFSInputStream dfsInputStream) { + if (isInputStreamTracked(dfsInputStream)) { + LOG.trace("De-registering {} for {}", dfsInputStream, dfsInputStream.getSrc()); + registeredInputStreams.remove(dfsInputStream); + } + } + + public synchronized boolean isInputStreamTracked(DFSInputStream dfsInputStream) { + return registeredInputStreams.contains(dfsInputStream); + } + + public long getInterval() { + return interval; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 6a8a189..cf12793 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -199,6 +199,19 @@ public interface HdfsClientConfigKeys { "dfs.client.refresh.read-block-locations.ms"; long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L; + // Number of threads to use for refreshing LocatedBlocks of registered + // DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing + // this may help refresh them all in a timely manner. + String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY = + "dfs.client.refresh.read-block-locations.threads"; + int DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT = 5; + + // Whether to auto-register all DFSInputStreams for background refreshes. + // If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream) + String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY = + "dfs.client.refresh.read-block-locations.register-automatically"; + boolean DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT = true; + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 4ccfedf..5701389 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -144,6 +144,7 @@ public class DfsClientConf { /** wait time window before refreshing blocklocation for inputstream. */ private final long refreshReadBlockLocationsMS; + private final boolean refreshReadBlockLocationsAutomatically; private final ShortCircuitConf shortCircuitConf; private final int clientShortCircuitNum; @@ -266,6 +267,10 @@ public class DfsClientConf { HdfsClientConfigKeys. DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT); + refreshReadBlockLocationsAutomatically = conf.getBoolean( + HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY, + HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT); + hedgedReadThresholdMillis = conf.getLong( HedgedRead.THRESHOLD_MILLIS_KEY, HedgedRead.THRESHOLD_MILLIS_DEFAULT); @@ -696,13 +701,18 @@ public class DfsClientConf { return replicaAccessorBuilderClasses; } - /** - * @return the replicaAccessorBuilderClasses - */ - public long getRefreshReadBlockLocationsMS() { + public boolean isLocatedBlocksRefresherEnabled() { + return refreshReadBlockLocationsMS > 0; + } + + public long getLocatedBlocksRefresherInterval() { return refreshReadBlockLocationsMS; } + public boolean isRefreshReadBlockLocationsAutomatically() { + return refreshReadBlockLocationsAutomatically; + } + /** * @return the shortCircuitConf */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 80c4818..af547fd 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3260,6 +3260,25 @@ </description> </property> + <property> + <name>dfs.client.refresh.read-block-locations.register-automatically</name> + <value>true</value> + <description> + Whether to auto-register all DFSInputStreams for background LocatedBlock refreshes. + If false, user must manually register using DFSClient#addLocatedBlocksRefresh(DFSInputStream) + </description> + </property> + + <property> + <name>dfs.client.refresh.read-block-locations.threads</name> + <value>5</value> + <description> + Number of threads to use for refreshing LocatedBlocks of registered + DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing + this may help refresh them all in a timely manner. + </description> + </property> + <property> <name>dfs.namenode.lease-recheck-interval-ms</name> <value>2000</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java index 9fed914..50378f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java @@ -21,28 +21,25 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; -import java.util.List; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.util.Time; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -137,154 +134,111 @@ public class TestDFSInputStreamBlockLocations { } @Test - public void testRead() throws Exception { + public void testRefreshBlockLocations() throws IOException { + final String fileName = "/test_cache_locations"; + filePath = createFile(fileName); + + try (DFSInputStream fin = dfsClient.open(fileName)) { + LocatedBlocks existing = fin.locatedBlocks; + long lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting(); + + assertFalse("should not have attempted refresh", + fin.refreshBlockLocations(null)); + assertEquals("should not have updated lastRefreshedAt", + lastRefreshedAt, fin.getLastRefreshedBlocksAtForTesting()); + assertSame("should not have modified locatedBlocks", + existing, fin.locatedBlocks); + + // fake a dead node to force refresh + // refreshBlockLocations should return true, indicating we attempted a refresh + // nothing should be changed, because locations have not changed + fin.addToLocalDeadNodes(dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]); + assertTrue("should have attempted refresh", + fin.refreshBlockLocations(null)); + verifyChanged(fin, existing, lastRefreshedAt); + + // reset + lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting(); + existing = fin.locatedBlocks; + + // It's hard to test explicitly for non-local nodes, but we can fake it + // because we also treat unresolved as non-local. Pass in a cache where all the datanodes + // are unresolved hosts. + Map<String, InetSocketAddress> mockAddressCache = new HashMap<>(); + InetSocketAddress unresolved = InetSocketAddress.createUnresolved("www.google.com", 80); + for (DataNode dataNode : dfsCluster.getDataNodes()) { + mockAddressCache.put(dataNode.getDatanodeUuid(), unresolved); + } + + assertTrue("should have attempted refresh", + fin.refreshBlockLocations(mockAddressCache)); + verifyChanged(fin, existing, lastRefreshedAt); + } + } + + private void verifyChanged(DFSInputStream fin, LocatedBlocks existing, long lastRefreshedAt) { + assertTrue("lastRefreshedAt should have incremented", + fin.getLastRefreshedBlocksAtForTesting() > lastRefreshedAt); + assertNotSame("located blocks should have changed", + existing, fin.locatedBlocks); + assertTrue("deadNodes should be empty", + fin.getLocalDeadNodes().isEmpty()); + } + + @Test + public void testDeferredRegistrationStatefulRead() throws IOException { + testWithRegistrationMethod(DFSInputStream::read); + } + + @Test + public void testDeferredRegistrationPositionalRead() throws IOException { + testWithRegistrationMethod(fin -> fin.readFully(0, new byte[1])); + } + + @Test + public void testDeferredRegistrationGetAllBlocks() throws IOException { + testWithRegistrationMethod(DFSInputStream::getAllBlocks); + } + + @FunctionalInterface + interface ThrowingConsumer { + void accept(DFSInputStream fin) throws IOException; + } + + private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) throws IOException { final String fileName = "/test_cache_locations"; - filePath = new Path(fileName); + filePath = createFile(fileName); + DFSInputStream fin = null; - FSDataOutputStream fout = null; try { - // create a file and write for testing - fout = fs.create(filePath, REPLICATION_FACTOR); - fout.write(new byte[(fileLength)]); - // finalize the file by closing the output stream - fout.close(); - fout = null; - // get the located blocks - LocatedBlocks referenceLocatedBlocks = - dfsClient.getLocatedBlocks(fileName, 0, fileLength); - assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount()); - String poolId = dfsCluster.getNamesystem().getBlockPoolId(); fin = dfsClient.open(fileName); - // get the located blocks from fin - LocatedBlocks finLocatedBlocks = fin.locatedBlocks; - assertEquals(dfsClientPrefetchSize / BLOCK_SIZE, - finLocatedBlocks.locatedBlockCount()); - final int chunkReadSize = BLOCK_SIZE / 4; - byte[] readBuffer = new byte[chunkReadSize]; - // read the first block - DatanodeInfo prevDNInfo = null; - DatanodeInfo currDNInfo = null; - int bytesRead = 0; - int firstBlockMark = BLOCK_SIZE; - // get the second block locations - LocatedBlock firstLocatedBlk = - fin.locatedBlocks.getLocatedBlocks().get(0); - DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations(); - while (fin.getPos() < firstBlockMark) { - bytesRead = fin.read(readBuffer); - Assert.assertTrue("Unexpected number of read bytes", - chunkReadSize >= bytesRead); - if (currDNInfo == null) { - currDNInfo = fin.getCurrentDatanode(); - assertNotNull("current FIS datanode is null", currDNInfo); - continue; - } - prevDNInfo = currDNInfo; - currDNInfo = fin.getCurrentDatanode(); - assertEquals("the DFSInput stream does not read from same node", - prevDNInfo, currDNInfo); - } + assertFalse("should not be tracking input stream on open", + dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin)); - assertEquals("InputStream exceeds expected position", - firstBlockMark, fin.getPos()); - // get the second block locations - LocatedBlock secondLocatedBlk = - fin.locatedBlocks.getLocatedBlocks().get(1); - // get the nodeinfo for that block - DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations(); - DatanodeInfo deadNodeInfo = secondBlkDNInfos[0]; - // stop the datanode in the list of the - DataNode deadNode = getdataNodeFromHostName(dfsCluster, - deadNodeInfo.getHostName()); - // Shutdown and wait for datanode to be marked dead - DatanodeRegistration reg = InternalDataNodeTestUtils. - getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId); - DataNodeProperties stoppedDNProps = - dfsCluster.stopDataNode(deadNodeInfo.getName()); + // still not registered because it hasn't been an hour by the time we call this + registrationMethod.accept(fin); + assertFalse("should not be tracking input stream after first read", + dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin)); - List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes(); - assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size()); - // get the located blocks - LocatedBlocks afterStoppageLocatedBlocks = - dfsClient.getLocatedBlocks(fileName, 0, fileLength); - // read second block - int secondBlockMark = (int) (1.5 * BLOCK_SIZE); - boolean firstIteration = true; - if (this.enableBlkExpiration) { - // set the time stamps to make sure that we do not refresh locations yet - fin.setReadTimeStampsForTesting(Time.monotonicNow()); - } - while (fin.getPos() < secondBlockMark) { - bytesRead = fin.read(readBuffer); - assertTrue("dead node used to read at position: " + fin.getPos(), - fin.deadNodesContain(deadNodeInfo)); - Assert.assertTrue("Unexpected number of read bytes", - chunkReadSize >= bytesRead); - prevDNInfo = currDNInfo; - currDNInfo = fin.getCurrentDatanode(); - assertNotEquals(deadNodeInfo, currDNInfo); - if (firstIteration) { - // currDNInfo has to be different unless first block locs is different - assertFalse("FSInputStream should pick a different DN", - firstBlkDNInfos[0].equals(deadNodeInfo) - && prevDNInfo.equals(currDNInfo)); - firstIteration = false; - } - } - assertEquals("InputStream exceeds expected position", - secondBlockMark, fin.getPos()); - // restart the dead node with the same port - assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true)); - dfsCluster.waitActive(); - List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes(); - assertEquals(NUM_DATA_NODES, datanodesPostRestart.size()); - // continue reading from block 2 again. We should read from deadNode - int thirdBlockMark = 2 * BLOCK_SIZE; - firstIteration = true; - while (fin.getPos() < thirdBlockMark) { - bytesRead = fin.read(readBuffer); - if (this.enableBlkExpiration) { - assertEquals("node is removed from deadNodes after 1st iteration", - firstIteration, fin.deadNodesContain(deadNodeInfo)); - } else { - assertTrue(fin.deadNodesContain(deadNodeInfo)); - } - Assert.assertTrue("Unexpected number of read bytes", - chunkReadSize >= bytesRead); - prevDNInfo = currDNInfo; - currDNInfo = fin.getCurrentDatanode(); - if (!this.enableBlkExpiration) { - assertNotEquals(deadNodeInfo, currDNInfo); - } - if (firstIteration) { - assertEquals(prevDNInfo, currDNInfo); - firstIteration = false; - if (this.enableBlkExpiration) { - // reset the time stamps of located blocks to force cache expiration - fin.setReadTimeStampsForTesting( - Time.monotonicNow() - (dfsInputLocationsTimeout + 1)); - } - } - } - assertEquals("InputStream exceeds expected position", - thirdBlockMark, fin.getPos()); + // artificially make it have been an hour + fin.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - (dfsInputLocationsTimeout + 1)); + registrationMethod.accept(fin); + assertEquals("SHOULD be tracking input stream on read after interval, only if enabled", + enableBlkExpiration, dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin)); } finally { - if (fout != null) { - fout.close(); - } if (fin != null) { fin.close(); + assertFalse(dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin)); } + fs.delete(filePath, true); } } - private DataNode getdataNodeFromHostName(MiniDFSCluster cluster, - String hostName) { - for (DataNode dn : cluster.getDataNodes()) { - if (dn.getDatanodeId().getHostName().equals(hostName)) { - return dn; - } + private Path createFile(String fileName) throws IOException { + Path path = new Path(fileName); + try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) { + fout.write(new byte[(fileLength)]); } - return null; + return path; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java new file mode 100644 index 0000000..5d8c479 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLocatedBlocksRefresher { + private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class); + + private static final int BLOCK_SIZE = 1024 * 1024; + private static final short REPLICATION_FACTOR = (short) 4; + private static final String[] RACKS = new String[] { + "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" }; + private static final int NUM_DATA_NODES = RACKS.length; + + private final int numOfBlocks = 24; + private final int fileLength = numOfBlocks * BLOCK_SIZE; + private final int dfsClientPrefetchSize = fileLength / 2; + + private MiniDFSCluster cluster; + private Configuration conf; + + @Before + public void setUp() throws Exception { + cluster = null; + conf = new HdfsConfiguration(); + + // disable shortcircuit reading + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); + // set replication factor + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR); + // set block size and other sizes + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, + dfsClientPrefetchSize); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(true, true); + } + } + + private void setupTest(long refreshInterval) throws IOException { + conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, refreshInterval); + + // this is necessary to ensure no caching between runs + conf.set("dfs.client.context", UUID.randomUUID().toString()); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build(); + cluster.waitActive(); + } + + @Test + public void testDisabledOnZeroInterval() throws IOException { + setupTest(0); + assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher()); + } + + @Test + public void testEnabledOnNonZeroInterval() throws Exception { + setupTest(1000); + LocatedBlocksRefresher refresher = + cluster.getFileSystem().getClient().getLocatedBlockRefresher(); + assertNotNull(refresher); + assertNoMoreRefreshes(refresher); + } + + @Test + public void testRefreshOnDeadNodes() throws Exception { + setupTest(1000); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSClient client = fs.getClient(); + LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher(); + + String fileName = createTestFile(fs); + + try (DFSInputStream fin = client.open(fileName)) { + LocatedBlocks locatedBlocks = fin.locatedBlocks; + assertEquals(dfsClientPrefetchSize / BLOCK_SIZE, + locatedBlocks.locatedBlockCount()); + + // should not be tracked yet + assertFalse(refresher.isInputStreamTracked(fin)); + + // track and verify + refresher.addInputStream(fin); + assertTrue(refresher.isInputStreamTracked(fin)); + + // no refreshes yet, as nothing has happened + assertNoMoreRefreshes(refresher); + synchronized (fin.infoLock) { + assertSame(locatedBlocks, fin.locatedBlocks); + } + + stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1); + + // read blocks, which should trigger dead node for the one we stopped + int chunkReadSize = BLOCK_SIZE / 4; + byte[] readBuffer = new byte[chunkReadSize]; + fin.read(0, readBuffer, 0, readBuffer.length); + + assertEquals(1, fin.getLocalDeadNodes().size()); + + // we should get a refresh now + assertRefreshes(refresher, 1); + + // verify that it actually changed things + synchronized (fin.infoLock) { + assertNotSame(locatedBlocks, fin.locatedBlocks); + assertTrue(fin.getLocalDeadNodes().isEmpty()); + } + + // no more refreshes because everything is happy again + assertNoMoreRefreshes(refresher); + + // stop another node, and try to trigger a new deadNode + stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2); + readBuffer = new byte[chunkReadSize]; + fin.read(0, readBuffer, 0, readBuffer.length); + + // we should refresh again now, and verify + // may actually be more than 1, since the first dead node + // may still be listed in the replicas for the bock + assertTrue(fin.getLocalDeadNodes().size() > 0); + + assertRefreshes(refresher, 1); + + synchronized (fin.infoLock) { + assertNotSame(locatedBlocks, fin.locatedBlocks); + assertTrue(fin.getLocalDeadNodes().isEmpty()); + } + + // de-register, and expect no more refreshes below + refresher.removeInputStream(fin); + } + + assertNoMoreRefreshes(refresher); + } + + private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) { + synchronized (fin.infoLock) { + int idx = fin.locatedBlocks.findBlock(0); + for (int i = 0; i < REPLICATION_FACTOR; i++) { + String deadNodeAddr = fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr(); + + DataNodeProperties dataNodeProperties = cluster.stopDataNode(deadNodeAddr); + if (dataNodeProperties != null) { + List<DataNode> datanodesPostStoppage = cluster.getDataNodes(); + assertEquals(expectedNodes, datanodesPostStoppage.size()); + return; + } + } + + throw new RuntimeException("Could not find a datanode to stop"); + } + } + + private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws InterruptedException { + long interval = refresher.getInterval(); + int runCount = refresher.getRunCount(); + int refreshCount = refresher.getRefreshCount(); + + LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes", + runCount + 3, runCount); + // wait for it to run 3 times, with some buffer + awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval); + + // it should not have refreshed anything, because no DFSInputStreams registered anymore + assertEquals(refreshCount, refresher.getRefreshCount()); + } + + private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes) + throws InterruptedException { + int runCount = refresher.getRunCount(); + int refreshCount = refresher.getRefreshCount(); + int expectedRuns = 3; + + if (expectedRefreshes < 0) { + expectedRefreshes = expectedRuns; + } + + LOG.info( + "Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}", + runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount + ); + + // wait for it to run 3 times + awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000); + + // the values may not be identical due to any refreshes that occurred before we opened + // the DFSInputStream but the difference should be identical since we are refreshing + // every time + assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount); + } + + private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis) + throws InterruptedException { + long now = Time.monotonicNow(); + + while(!test.get()) { + if (Time.monotonicNow() - now > timeoutMillis) { + fail("Timed out waiting for true condition"); + return; + } + + Thread.sleep(50); + } + } + + private String createTestFile(FileSystem fs) throws IOException { + String fileName = "/located_blocks_" + UUID.randomUUID().toString(); + Path filePath = new Path(fileName); + try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) { + fout.write(new byte[(fileLength)]); + } + fs.deleteOnExit(filePath); + + return fileName; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org