This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new bd13d73  HDFS-16262. Async refresh of cached locations in 
DFSInputStream (#3527)
bd13d73 is described below

commit bd13d73334c8468c5345f80487e8a7f671707428
Author: Bryan Beaudreault <bbeaudrea...@hubspot.com>
AuthorDate: Tue Jan 25 06:42:35 2022 -0500

    HDFS-16262. Async refresh of cached locations in DFSInputStream (#3527)
    
    (cherry picked from commit 94b884ae553e9d67f57e3ae7d2b22468c6f342fa)
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  42 +++
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |  34 ++-
 .../org/apache/hadoop/hdfs/DFSInputStream.java     | 324 ++++++++++++++-------
 .../apache/hadoop/hdfs/DFSStripedInputStream.java  |  10 +-
 .../apache/hadoop/hdfs/LocatedBlocksRefresher.java | 210 +++++++++++++
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  13 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  18 +-
 .../src/main/resources/hdfs-default.xml            |  19 ++
 .../hdfs/TestDFSInputStreamBlockLocations.java     | 244 +++++++---------
 .../hadoop/hdfs/TestLocatedBlocksRefresher.java    | 266 +++++++++++++++++
 10 files changed, 915 insertions(+), 265 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index 5bb7e03..f83346e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -70,6 +70,11 @@ public class ClientContext {
   private final String name;
 
   /**
+   * The client conf used to initialize context.
+   */
+  private final DfsClientConf dfsClientConf;
+
+  /**
    * String representation of the configuration.
    */
   private final String confString;
@@ -131,6 +136,17 @@ public class ClientContext {
   private volatile DeadNodeDetector deadNodeDetector = null;
 
   /**
+   * The switch for the {@link LocatedBlocksRefresher}.
+   */
+  private final boolean locatedBlocksRefresherEnabled;
+
+  /**
+   * Periodically refresh the {@link 
org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
+   * registered {@link DFSInputStream}s, to take advantage of changes in block 
placement.
+   */
+  private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;
+
+  /**
    * Count the reference of ClientContext.
    */
   private int counter = 0;
@@ -146,6 +162,7 @@ public class ClientContext {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
     this.name = name;
+    this.dfsClientConf = conf;
     this.confString = scConf.confAsString();
     this.clientShortCircuitNum = conf.getClientShortCircuitNum();
     this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
@@ -164,6 +181,7 @@ public class ClientContext {
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
     this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
+    this.locatedBlocksRefresherEnabled = 
conf.isLocatedBlocksRefresherEnabled();
     initTopologyResolution(config);
   }
 
@@ -302,6 +320,21 @@ public class ClientContext {
   }
 
   /**
+   * If true, LocatedBlocksRefresher will be periodically refreshing 
LocatedBlocks
+   * of registered DFSInputStreams.
+   */
+  public boolean isLocatedBlocksRefresherEnabled() {
+    return locatedBlocksRefresherEnabled;
+  }
+
+  /**
+   * Obtain LocatedBlocksRefresher of the current client.
+   */
+  public LocatedBlocksRefresher getLocatedBlocksRefresher() {
+    return locatedBlocksRefresher;
+  }
+
+  /**
    * Increment the counter. Start the dead node detector thread if there is no
    * reference.
    */
@@ -311,6 +344,10 @@ public class ClientContext {
       deadNodeDetector = new DeadNodeDetector(name, configuration);
       deadNodeDetector.start();
     }
+    if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
+      locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, 
dfsClientConf);
+      locatedBlocksRefresher.start();
+    }
   }
 
   /**
@@ -324,5 +361,10 @@ public class ClientContext {
       deadNodeDetector.shutdown();
       deadNodeDetector = null;
     }
+
+    if (counter == 0 && locatedBlocksRefresherEnabled && 
locatedBlocksRefresher != null) {
+      locatedBlocksRefresher.shutdown();
+      locatedBlocksRefresher = null;
+    }
   }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 7500273..ea9736d 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -870,7 +870,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   }
 
   public long getRefreshReadBlkLocationsInterval() {
-    return dfsClientConf.getRefreshReadBlockLocationsMS();
+    return dfsClientConf.getLocatedBlocksRefresherInterval();
   }
 
   public LocatedBlocks getLocatedBlocks(String src, long start)
@@ -3402,4 +3402,36 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   public DeadNodeDetector getDeadNodeDetector() {
     return clientContext.getDeadNodeDetector();
   }
+
+  /**
+   * Obtain LocatedBlocksRefresher of the current client.
+   */
+  public LocatedBlocksRefresher getLocatedBlockRefresher() {
+    return clientContext.getLocatedBlocksRefresher();
+  }
+
+  /**
+   * Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so 
that
+   * the underlying {@link LocatedBlocks} is periodically refreshed.
+   */
+  public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
+    if (isLocatedBlocksRefresherEnabled()) {
+      clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
+    }
+  }
+
+  /**
+   * Removes the {@link DFSInputStream} from the {@link 
LocatedBlocksRefresher}, so that
+   * the underlying {@link LocatedBlocks} is no longer periodically refreshed.
+   * @param dfsInputStream
+   */
+  public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
+    if (isLocatedBlocksRefresherEnabled()) {
+      
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
+    }
+  }
+
+  private boolean isLocatedBlocksRefresherEnabled() {
+    return clientContext.isLocatedBlocksRefresherEnabled();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 297bbfa..485b6b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -128,18 +130,18 @@ public class DFSInputStream extends FSInputStream
   private long lastBlockBeingWrittenLength = 0;
   private FileEncryptionInfo fileEncryptionInfo = null;
   protected CachingStrategy cachingStrategy;
+  // this is volatile because it will be polled outside the lock,
+  // but still only updated within the lock
+  private volatile long lastRefreshedBlocksAt = Time.monotonicNow();
   ////
 
+  private AtomicBoolean refreshingBlockLocations = new AtomicBoolean(false);
   protected final ReadStatistics readStatistics = new ReadStatistics();
   // lock for state shared between read and pread
   // Note: Never acquire a lock on <this> with this lock held to avoid 
deadlocks
   //       (it's OK to acquire this lock when the lock on <this> is held)
   protected final Object infoLock = new Object();
 
-  // refresh locatedBlocks periodically
-  private final long refreshReadBlockIntervals;
-  /** timeStamp of the last time a block location was refreshed. */
-  private long locatedBlocksTimeStamp;
   /**
    * Track the ByteBuffers that we have handed out to readers.
    *
@@ -156,10 +158,6 @@ public class DFSInputStream extends FSInputStream
     return extendedReadBuffers;
   }
 
-  private boolean isPeriodicRefreshEnabled() {
-    return (refreshReadBlockIntervals > 0L);
-  }
-
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -206,9 +204,6 @@ public class DFSInputStream extends FSInputStream
   DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       LocatedBlocks locatedBlocks) throws IOException {
     this.dfsClient = dfsClient;
-    this.refreshReadBlockIntervals =
-        this.dfsClient.getRefreshReadBlkLocationsInterval();
-    setLocatedBlocksTimeStamp();
     this.verifyChecksum = verifyChecksum;
     this.src = src;
     synchronized (infoLock) {
@@ -228,19 +223,6 @@ public class DFSInputStream extends FSInputStream
     return deadNodes.containsKey(nodeInfo);
   }
 
-  @VisibleForTesting
-  void setReadTimeStampsForTesting(long timeStamp) {
-    setLocatedBlocksTimeStamp(timeStamp);
-  }
-
-  private void setLocatedBlocksTimeStamp() {
-    setLocatedBlocksTimeStamp(Time.monotonicNow());
-  }
-
-  private void setLocatedBlocksTimeStamp(long timeStamp) {
-    this.locatedBlocksTimeStamp = timeStamp;
-  }
-
   /**
    * Grab the open-file info from namenode
    * @param refreshLocatedBlocks whether to re-fetch locatedblocks
@@ -248,33 +230,50 @@ public class DFSInputStream extends FSInputStream
   void openInfo(boolean refreshLocatedBlocks) throws IOException {
     final DfsClientConf conf = dfsClient.getConf();
     synchronized(infoLock) {
-      lastBlockBeingWrittenLength =
-          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
       int retriesForLastBlockLength = 
conf.getRetryTimesForGetLastBlockLength();
-      while (retriesForLastBlockLength > 0) {
+
+      while (true) {
+        LocatedBlocks newLocatedBlocks;
+        if (locatedBlocks == null || refreshLocatedBlocks) {
+          newLocatedBlocks = fetchAndCheckLocatedBlocks(locatedBlocks);
+        } else {
+          newLocatedBlocks = locatedBlocks;
+        }
+
+        long lastBlockLength = getLastBlockLength(newLocatedBlocks);
+        if (lastBlockLength != -1) {
+          setLocatedBlocksFields(newLocatedBlocks, lastBlockLength);
+          return;
+        }
+
         // Getting last block length as -1 is a special case. When cluster
         // restarts, DNs may not report immediately. At this time partial block
         // locations will not be available with NN for getting the length. Lets
         // retry for 3 times to get the length.
-        if (lastBlockBeingWrittenLength == -1) {
-          DFSClient.LOG.warn("Last block locations not available. "
-              + "Datanodes might not have reported blocks completely."
-              + " Will retry for " + retriesForLastBlockLength + " times");
-          waitFor(conf.getRetryIntervalForGetLastBlockLength());
-          lastBlockBeingWrittenLength =
-              fetchLocatedBlocksAndGetLastBlockLength(true);
-        } else {
-          break;
+
+        if (retriesForLastBlockLength-- <= 0) {
+          throw new IOException("Could not obtain the last block locations.");
         }
-        retriesForLastBlockLength--;
-      }
-      if (lastBlockBeingWrittenLength == -1
-          && retriesForLastBlockLength == 0) {
-        throw new IOException("Could not obtain the last block locations.");
+
+        DFSClient.LOG.warn("Last block locations not available. "
+            + "Datanodes might not have reported blocks completely."
+            + " Will retry for " + retriesForLastBlockLength + " times");
+        waitFor(conf.getRetryIntervalForGetLastBlockLength());
       }
     }
   }
 
+  /**
+   * Set locatedBlocks and related fields, using the passed lastBlockLength.
+   * Should be called within infoLock.
+   */
+  private void setLocatedBlocksFields(LocatedBlocks locatedBlocksToSet, long 
lastBlockLength) {
+    locatedBlocks = locatedBlocksToSet;
+    lastBlockBeingWrittenLength = lastBlockLength;
+    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
+    setLastRefreshedBlocksAt();
+  }
+
   private void waitFor(int waitTime) throws IOException {
     try {
       Thread.sleep(waitTime);
@@ -285,62 +284,18 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  /**
-   * Checks whether the block locations timestamps have expired.
-   * In the case of expired timestamp:
-   *    - clear list of deadNodes
-   *    - call openInfo(true) which will re-fetch locatedblocks
-   *    - update locatedBlocksTimeStamp
-   * @return true when the expiration feature is enabled and locatedblocks
-   *         timestamp has expired.
-   * @throws IOException
-   */
-  private boolean isLocatedBlocksExpired() {
-    if (!isPeriodicRefreshEnabled()) {
-      return false;
-    }
-    long now = Time.monotonicNow();
-    long elapsed = now - locatedBlocksTimeStamp;
-    if (elapsed < refreshReadBlockIntervals) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Update the block locations timestamps if they have expired.
-   * In the case of expired timestamp:
-   *    - clear list of deadNodes
-   *    - call openInfo(true) which will re-fetch locatedblocks
-   *    - update locatedBlocksTimeStamp
-   * @return true when the locatedblocks list is re-fetched from the namenode.
-   * @throws IOException
-   */
-  private boolean updateBlockLocationsStamp() throws IOException {
-    if (!isLocatedBlocksExpired()) {
-      return false;
-    }
-    // clear dead nodes
-    deadNodes.clear();
-    openInfo(true);
-    setLocatedBlocksTimeStamp();
-    return true;
-  }
-
-  private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+  private LocatedBlocks fetchAndCheckLocatedBlocks(LocatedBlocks existing)
       throws IOException {
-    LocatedBlocks newInfo = locatedBlocks;
-    if (locatedBlocks == null || refresh) {
-      newInfo = dfsClient.getLocatedBlocks(src, 0);
-    }
+    LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+
     DFSClient.LOG.debug("newInfo = {}", newInfo);
     if (newInfo == null) {
       throw new IOException("Cannot open filename " + src);
     }
 
-    if (locatedBlocks != null) {
+    if (existing != null) {
       Iterator<LocatedBlock> oldIter =
-          locatedBlocks.getLocatedBlocks().iterator();
+          existing.getLocatedBlocks().iterator();
       Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
       while (oldIter.hasNext() && newIter.hasNext()) {
         if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
@@ -348,17 +303,14 @@ public class DFSInputStream extends FSInputStream
         }
       }
     }
-    locatedBlocks = newInfo;
-    long lastBlkBeingWrittenLength = getLastBlockLength();
-    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
 
-    return lastBlkBeingWrittenLength;
+    return newInfo;
   }
 
-  private long getLastBlockLength() throws IOException{
+  private long getLastBlockLength(LocatedBlocks blocks) throws IOException{
     long lastBlockBeingWrittenLength = 0;
-    if (!locatedBlocks.isLastBlockComplete()) {
-      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+    if (!blocks.isLastBlockComplete()) {
+      final LocatedBlock last = blocks.getLastLocatedBlock();
       if (last != null) {
         if (last.getLocations().length == 0) {
           if (last.getBlockSize() == 0) {
@@ -501,6 +453,14 @@ public class DFSInputStream extends FSInputStream
     return getBlockRange(0, getFileLength());
   }
 
+  protected String getSrc() {
+    return src;
+  }
+
+  protected LocatedBlocks getLocatedBlocks() {
+    return locatedBlocks;
+  }
+
   /**
    * Get block at the specified position.
    * Fetch it from the namenode if not cached.
@@ -543,8 +503,8 @@ public class DFSInputStream extends FSInputStream
   /** Fetch a block from namenode and cache it */
   private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
       throws IOException {
+    maybeRegisterBlockRefresh();
     synchronized(infoLock) {
-      updateBlockLocationsStamp();
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -559,8 +519,7 @@ public class DFSInputStream extends FSInputStream
         }
         // Update the LastLocatedBlock, if offset is for last block.
         if (offset >= locatedBlocks.getFileLength()) {
-          locatedBlocks = newBlocks;
-          lastBlockBeingWrittenLength = getLastBlockLength();
+          setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
         } else {
           locatedBlocks.insertRange(targetBlockIdx,
               newBlocks.getLocatedBlocks());
@@ -587,6 +546,7 @@ public class DFSInputStream extends FSInputStream
       throw new IOException("Offset: " + offset +
         " exceeds file length: " + getFileLength());
     }
+
     synchronized(infoLock) {
       final List<LocatedBlock> blocks;
       final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
@@ -644,6 +604,9 @@ public class DFSInputStream extends FSInputStream
     if (target >= getFileLength()) {
       throw new IOException("Attempted to read past end of file");
     }
+
+    maybeRegisterBlockRefresh();
+
     // Will be getting a new BlockReader.
     closeCurrentBlockReaders();
 
@@ -657,9 +620,6 @@ public class DFSInputStream extends FSInputStream
     boolean connectFailedOnce = false;
 
     while (true) {
-      // Re-fetch the locatedBlocks from NN if the timestamp has expired.
-      updateBlockLocationsStamp();
-
       //
       // Compute desired block
       //
@@ -793,6 +753,7 @@ public class DFSInputStream extends FSInputStream
        * this dfsInputStream anymore.
        */
       dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
+      maybeDeRegisterBlockRefresh();
     }
   }
 
@@ -871,16 +832,16 @@ public class DFSInputStream extends FSInputStream
     int len = strategy.getTargetLength();
     CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     failures = 0;
+
+    maybeRegisterBlockRefresh();
+
     if (pos < getFileLength()) {
       int retries = 2;
       while (retries > 0) {
         try {
           // currentNode can be left as null if previous read had a checksum
           // error on the same block. See HDFS-3067
-          // currentNode needs to be updated if the blockLocations timestamp 
has
-          // expired.
-          if (pos > blockEnd || currentNode == null
-              || updateBlockLocationsStamp()) {
+          if (pos > blockEnd || currentNode == null) {
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
@@ -1958,4 +1919,153 @@ public class DFSInputStream extends FSInputStream
       return false;
     }
   }
+
+  /**
+   * Many DFSInputStreams can be opened and closed in quick succession, in 
which case
+   * they would be registered/deregistered but never need to be refreshed.
+   * Defers registering with the located block refresher, in order to avoid an 
additional
+   * source of unnecessary synchronization for short-lived DFSInputStreams.
+   */
+  protected void maybeRegisterBlockRefresh() {
+    if (!dfsClient.getConf().isRefreshReadBlockLocationsAutomatically()
+        || !dfsClient.getConf().isLocatedBlocksRefresherEnabled()) {
+      return;
+    }
+
+    if (refreshingBlockLocations.get()) {
+      return;
+    }
+
+    // not enough time elapsed to refresh
+    long timeSinceLastRefresh = Time.monotonicNow() - lastRefreshedBlocksAt;
+    if (timeSinceLastRefresh < 
dfsClient.getConf().getLocatedBlocksRefresherInterval()) {
+      return;
+    }
+
+    if (!refreshingBlockLocations.getAndSet(true)) {
+      dfsClient.addLocatedBlocksRefresh(this);
+    }
+  }
+
+  /**
+   * De-register periodic refresh of this inputstream, if it was added to 
begin with.
+   */
+  private void maybeDeRegisterBlockRefresh() {
+    if (refreshingBlockLocations.get()) {
+      dfsClient.removeLocatedBlocksRefresh(this);
+    }
+  }
+
+  /**
+   * Refresh blocks for the input stream, if necessary.
+   *
+   * @param addressCache optional map to use as a cache for resolving datanode 
InetSocketAddress
+   * @return whether a refresh was performed or not
+   */
+  boolean refreshBlockLocations(Map<String, InetSocketAddress> addressCache) {
+    LocatedBlocks blocks;
+    synchronized (infoLock) {
+      blocks = getLocatedBlocks();
+    }
+
+    if (getLocalDeadNodes().isEmpty() && allBlocksLocal(blocks, addressCache)) 
{
+      return false;
+    }
+
+    try {
+      DFSClient.LOG.debug("Refreshing {} for path {}", this, getSrc());
+      LocatedBlocks newLocatedBlocks = fetchAndCheckLocatedBlocks(blocks);
+      long lastBlockLength = getLastBlockLength(newLocatedBlocks);
+      if (lastBlockLength == -1) {
+        DFSClient.LOG.debug(
+            "Discarding refreshed blocks for path {} because lastBlockLength 
was -1",
+            getSrc());
+        return true;
+      }
+
+      setRefreshedValues(newLocatedBlocks, lastBlockLength);
+    } catch (IOException e) {
+      DFSClient.LOG.debug("Failed to refresh DFSInputStream for path {}", 
getSrc(), e);
+    }
+
+    return true;
+  }
+
+  /**
+   * Once new LocatedBlocks have been fetched, sets them on the DFSInputStream 
and
+   * updates stateful read location within the necessary locks.
+   */
+  private synchronized void setRefreshedValues(LocatedBlocks blocks, long 
lastBlockLength)
+      throws IOException {
+    synchronized (infoLock) {
+      setLocatedBlocksFields(blocks, lastBlockLength);
+    }
+
+    getLocalDeadNodes().clear();
+
+    // if a stateful read has been initialized, refresh it
+    if (currentNode != null) {
+      currentNode = blockSeekTo(pos);
+    }
+  }
+
+  private boolean allBlocksLocal(LocatedBlocks blocks,
+      Map<String, InetSocketAddress> addressCache) {
+    if (addressCache == null) {
+      addressCache = new HashMap<>();
+    }
+
+    // we only need to check the first location of each block, because the 
blocks are already
+    // sorted by distance from the current host
+    for (LocatedBlock lb : blocks.getLocatedBlocks()) {
+      if (lb.getLocations().length == 0) {
+        return false;
+      }
+
+      DatanodeInfoWithStorage location = lb.getLocations()[0];
+      if (location == null) {
+        return false;
+      }
+
+      InetSocketAddress targetAddr = addressCache.computeIfAbsent(
+          location.getDatanodeUuid(),
+          unused -> {
+            String dnAddr = 
location.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
+            return NetUtils.createSocketAddr(
+                dnAddr,
+                -1,
+                null,
+                dfsClient.getConf().isUriCacheEnabled());
+          });
+
+      if (!isResolveableAndLocal(targetAddr)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean isResolveableAndLocal(InetSocketAddress targetAddr) {
+    try {
+      return DFSUtilClient.isLocalAddress(targetAddr);
+    } catch (IOException e) {
+      DFSClient.LOG.debug("Got an error checking if {} is local", targetAddr, 
e);
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  void setLastRefreshedBlocksAtForTesting(long timestamp) {
+    lastRefreshedBlocksAt = timestamp;
+  }
+
+  @VisibleForTesting
+  long getLastRefreshedBlocksAtForTesting() {
+    return lastRefreshedBlocksAt;
+  }
+
+  private void setLastRefreshedBlocksAt() {
+    lastRefreshedBlocksAt = Time.monotonicNow();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 6158521..ec4ad35 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -143,14 +143,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     return curStripeBuf;
   }
 
-  protected String getSrc() {
-    return src;
-  }
-
-  protected LocatedBlocks getLocatedBlocks() {
-    return locatedBlocks;
-  }
-
   protected ByteBufferPool getBufferPool() {
     return BUFFER_POOL;
   }
@@ -168,6 +160,8 @@ public class DFSStripedInputStream extends DFSInputStream {
       throw new IOException("Attempted to read past end of file");
     }
 
+    maybeRegisterBlockRefresh();
+
     // Will be getting a new BlockReader.
     closeCurrentBlockReaders();
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java
new file mode 100644
index 0000000..454d1f9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/LocatedBlocksRefresher.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Periodically refresh the underlying cached {@link LocatedBlocks} for 
eligible registered
+ * {@link DFSInputStream}s.  DFSInputStreams are eligible for refreshing if 
they have any
+ * deadNodes or any blocks are lacking local replicas.
+ * Disabled by default, unless an interval is configured.
+ */
+public class LocatedBlocksRefresher extends Daemon {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocatedBlocksRefresher.class);
+
+  private static final String THREAD_PREFIX = "located-block-refresher-";
+
+  private final String name;
+  private final long interval;
+  private final long jitter;
+  private final ExecutorService refreshThreadPool;
+
+  // Use WeakHashMap so that we don't hold onto references that might have not 
been explicitly
+  // closed because they were created and thrown away.
+  private final Set<DFSInputStream> registeredInputStreams =
+      Collections.newSetFromMap(new WeakHashMap<>());
+
+  private int runCount;
+  private int refreshCount;
+
+  LocatedBlocksRefresher(String name, Configuration conf, DfsClientConf 
dfsClientConf) {
+    this.name = name;
+    this.interval = dfsClientConf.getLocatedBlocksRefresherInterval();
+    this.jitter = Math.round(this.interval * 0.1);
+    int rpcThreads = 
conf.getInt(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY,
+        DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT);
+
+    String threadPrefix;
+    if (name.equals(DFS_CLIENT_CONTEXT_DEFAULT)) {
+      threadPrefix = THREAD_PREFIX;
+    } else {
+      threadPrefix = THREAD_PREFIX + name + "-";
+    }
+
+    this.refreshThreadPool = Executors.newFixedThreadPool(rpcThreads, new 
Daemon.DaemonFactory() {
+      private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = super.newThread(r);
+        t.setName(threadPrefix + threadIndex.getAndIncrement());
+        return t;
+      }
+    });
+
+    setName(threadPrefix + "main");
+
+    LOG.info("Start located block refresher for DFSClient {}.", this.name);
+  }
+
+  @Override
+  public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+
+      if (!waitForInterval()) {
+        return;
+      }
+
+      LOG.debug("Running refresh for {} streams", 
registeredInputStreams.size());
+      long start = Time.monotonicNow();
+      AtomicInteger neededRefresh = new AtomicInteger(0);
+
+      Phaser phaser = new Phaser(1);
+
+      Map<String, InetSocketAddress> addressCache = new ConcurrentHashMap<>();
+
+      for (DFSInputStream inputStream : getInputStreams()) {
+        phaser.register();
+        refreshThreadPool.submit(() -> {
+          try {
+            if (isInputStreamTracked(inputStream) &&
+                inputStream.refreshBlockLocations(addressCache)) {
+              neededRefresh.incrementAndGet();
+            }
+          } finally {
+            phaser.arriveAndDeregister();
+          }
+        });
+      }
+
+      phaser.arriveAndAwaitAdvance();
+
+      synchronized (this) {
+        runCount++;
+        refreshCount += neededRefresh.get();
+      }
+
+      LOG.debug(
+          "Finished refreshing {} of {} streams in {}ms",
+          neededRefresh,
+          registeredInputStreams.size(),
+          Time.monotonicNow() - start
+      );
+    }
+  }
+
+  public synchronized int getRunCount() {
+    return runCount;
+  }
+
+  public synchronized int getRefreshCount() {
+    return refreshCount;
+  }
+
+  private boolean waitForInterval() {
+    try {
+      Thread.sleep(interval + ThreadLocalRandom.current().nextLong(-jitter, 
jitter));
+      return true;
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted during wait interval", e);
+      Thread.currentThread().interrupt();
+      return false;
+    }
+  }
+
+  /**
+   * Shutdown all the threads.
+   */
+  public void shutdown() {
+    if (isAlive()) {
+      interrupt();
+      try {
+        join();
+      } catch (InterruptedException e) {
+      }
+    }
+    refreshThreadPool.shutdown();
+  }
+
+  /**
+   * Collects the DFSInputStreams to a list within synchronization, so that we 
can iterate them
+   * without potentially blocking callers to {@link 
#addInputStream(DFSInputStream)} or
+   * {@link #removeInputStream(DFSInputStream)}. We don't care so much about 
missing additions,
+   * and we'll guard against removals by doing an additional
+   * {@link #isInputStreamTracked(DFSInputStream)} track during iteration.
+   */
+  private synchronized Collection<DFSInputStream> getInputStreams() {
+    return new ArrayList<>(registeredInputStreams);
+  }
+
+  public synchronized void addInputStream(DFSInputStream dfsInputStream) {
+    LOG.trace("Registering {} for {}", dfsInputStream, 
dfsInputStream.getSrc());
+    registeredInputStreams.add(dfsInputStream);
+  }
+
+  public synchronized void removeInputStream(DFSInputStream dfsInputStream) {
+    if (isInputStreamTracked(dfsInputStream)) {
+      LOG.trace("De-registering {} for {}", dfsInputStream, 
dfsInputStream.getSrc());
+      registeredInputStreams.remove(dfsInputStream);
+    }
+  }
+
+  public synchronized boolean isInputStreamTracked(DFSInputStream 
dfsInputStream) {
+    return registeredInputStreams.contains(dfsInputStream);
+  }
+
+  public long getInterval() {
+    return interval;
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 6a8a189..cf12793 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -199,6 +199,19 @@ public interface HdfsClientConfigKeys {
       "dfs.client.refresh.read-block-locations.ms";
   long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L;
 
+  //  Number of threads to use for refreshing LocatedBlocks of registered
+  //  DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
+  //  this may help refresh them all in a timely manner.
+  String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_KEY =
+      "dfs.client.refresh.read-block-locations.threads";
+  int DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_THREADS_DEFAULT = 5;
+
+  // Whether to auto-register all DFSInputStreams for background refreshes.
+  // If false, user must manually register using 
DFSClient#addLocatedBlocksRefresh(DFSInputStream)
+  String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY =
+      "dfs.client.refresh.read-block-locations.register-automatically";
+  boolean DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT = true;
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 4ccfedf..5701389 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -144,6 +144,7 @@ public class DfsClientConf {
 
   /** wait time window before refreshing blocklocation for inputstream. */
   private final long refreshReadBlockLocationsMS;
+  private final boolean refreshReadBlockLocationsAutomatically;
 
   private final ShortCircuitConf shortCircuitConf;
   private final int clientShortCircuitNum;
@@ -266,6 +267,10 @@ public class DfsClientConf {
         HdfsClientConfigKeys.
             DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
 
+    refreshReadBlockLocationsAutomatically = conf.getBoolean(
+        
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_KEY,
+        
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_AUTOMATICALLY_DEFAULT);
+
     hedgedReadThresholdMillis = conf.getLong(
         HedgedRead.THRESHOLD_MILLIS_KEY,
         HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@@ -696,13 +701,18 @@ public class DfsClientConf {
     return replicaAccessorBuilderClasses;
   }
 
-  /**
-   * @return the replicaAccessorBuilderClasses
-   */
-  public long getRefreshReadBlockLocationsMS() {
+  public boolean isLocatedBlocksRefresherEnabled() {
+    return refreshReadBlockLocationsMS > 0;
+  }
+
+  public long getLocatedBlocksRefresherInterval() {
     return refreshReadBlockLocationsMS;
   }
 
+  public boolean isRefreshReadBlockLocationsAutomatically() {
+    return refreshReadBlockLocationsAutomatically;
+  }
+
   /**
    * @return the shortCircuitConf
    */
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 80c4818..af547fd 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3260,6 +3260,25 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.client.refresh.read-block-locations.register-automatically</name>
+    <value>true</value>
+    <description>
+      Whether to auto-register all DFSInputStreams for background LocatedBlock 
refreshes.
+      If false, user must manually register using 
DFSClient#addLocatedBlocksRefresh(DFSInputStream)
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.client.refresh.read-block-locations.threads</name>
+    <value>5</value>
+    <description>
+      Number of threads to use for refreshing LocatedBlocks of registered
+      DFSInputStreams. If a DFSClient opens many DFSInputStreams, increasing
+      this may help refresh them all in a timely manner.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.lease-recheck-interval-ms</name>
   <value>2000</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
index 9fed914..50378f6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
@@ -21,28 +21,25 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -137,154 +134,111 @@ public class TestDFSInputStreamBlockLocations {
   }
 
   @Test
-  public void testRead() throws Exception {
+  public void testRefreshBlockLocations() throws IOException {
+    final String fileName = "/test_cache_locations";
+    filePath = createFile(fileName);
+
+    try (DFSInputStream fin = dfsClient.open(fileName)) {
+      LocatedBlocks existing = fin.locatedBlocks;
+      long lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
+
+      assertFalse("should not have attempted refresh",
+          fin.refreshBlockLocations(null));
+      assertEquals("should not have updated lastRefreshedAt",
+          lastRefreshedAt, fin.getLastRefreshedBlocksAtForTesting());
+      assertSame("should not have modified locatedBlocks",
+          existing, fin.locatedBlocks);
+
+      // fake a dead node to force refresh
+      // refreshBlockLocations should return true, indicating we attempted a 
refresh
+      // nothing should be changed, because locations have not changed
+      
fin.addToLocalDeadNodes(dfsClient.datanodeReport(DatanodeReportType.LIVE)[0]);
+      assertTrue("should have attempted refresh",
+          fin.refreshBlockLocations(null));
+      verifyChanged(fin, existing, lastRefreshedAt);
+
+      // reset
+      lastRefreshedAt = fin.getLastRefreshedBlocksAtForTesting();
+      existing = fin.locatedBlocks;
+
+      // It's hard to test explicitly for non-local nodes, but we can fake it
+      // because we also treat unresolved as non-local. Pass in a cache where 
all the datanodes
+      // are unresolved hosts.
+      Map<String, InetSocketAddress> mockAddressCache = new HashMap<>();
+      InetSocketAddress unresolved = 
InetSocketAddress.createUnresolved("www.google.com", 80);
+      for (DataNode dataNode : dfsCluster.getDataNodes()) {
+        mockAddressCache.put(dataNode.getDatanodeUuid(), unresolved);
+      }
+
+      assertTrue("should have attempted refresh",
+          fin.refreshBlockLocations(mockAddressCache));
+      verifyChanged(fin, existing, lastRefreshedAt);
+    }
+  }
+
+  private void verifyChanged(DFSInputStream fin, LocatedBlocks existing, long 
lastRefreshedAt) {
+    assertTrue("lastRefreshedAt should have incremented",
+        fin.getLastRefreshedBlocksAtForTesting() > lastRefreshedAt);
+    assertNotSame("located blocks should have changed",
+        existing, fin.locatedBlocks);
+    assertTrue("deadNodes should be empty",
+        fin.getLocalDeadNodes().isEmpty());
+  }
+
+  @Test
+  public void testDeferredRegistrationStatefulRead() throws IOException {
+    testWithRegistrationMethod(DFSInputStream::read);
+  }
+
+  @Test
+  public void testDeferredRegistrationPositionalRead() throws IOException {
+    testWithRegistrationMethod(fin -> fin.readFully(0, new byte[1]));
+  }
+
+  @Test
+  public void testDeferredRegistrationGetAllBlocks() throws IOException {
+    testWithRegistrationMethod(DFSInputStream::getAllBlocks);
+  }
+
+  @FunctionalInterface
+  interface ThrowingConsumer {
+    void accept(DFSInputStream fin) throws IOException;
+  }
+
+  private void testWithRegistrationMethod(ThrowingConsumer registrationMethod) 
throws IOException {
     final String fileName = "/test_cache_locations";
-    filePath = new Path(fileName);
+    filePath = createFile(fileName);
+
     DFSInputStream fin = null;
-    FSDataOutputStream fout = null;
     try {
-      // create a file and write for testing
-      fout = fs.create(filePath, REPLICATION_FACTOR);
-      fout.write(new byte[(fileLength)]);
-      // finalize the file by closing the output stream
-      fout.close();
-      fout = null;
-      // get the located blocks
-      LocatedBlocks referenceLocatedBlocks =
-          dfsClient.getLocatedBlocks(fileName, 0, fileLength);
-      assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
-      String poolId = dfsCluster.getNamesystem().getBlockPoolId();
       fin = dfsClient.open(fileName);
-      // get the located blocks from fin
-      LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
-      assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
-          finLocatedBlocks.locatedBlockCount());
-      final int chunkReadSize = BLOCK_SIZE / 4;
-      byte[] readBuffer = new byte[chunkReadSize];
-      // read the first block
-      DatanodeInfo prevDNInfo = null;
-      DatanodeInfo currDNInfo = null;
-      int bytesRead = 0;
-      int firstBlockMark = BLOCK_SIZE;
-      // get the second block locations
-      LocatedBlock firstLocatedBlk =
-          fin.locatedBlocks.getLocatedBlocks().get(0);
-      DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
-      while (fin.getPos() < firstBlockMark) {
-        bytesRead = fin.read(readBuffer);
-        Assert.assertTrue("Unexpected number of read bytes",
-            chunkReadSize >= bytesRead);
-        if (currDNInfo == null) {
-          currDNInfo = fin.getCurrentDatanode();
-          assertNotNull("current FIS datanode is null", currDNInfo);
-          continue;
-        }
-        prevDNInfo = currDNInfo;
-        currDNInfo = fin.getCurrentDatanode();
-        assertEquals("the DFSInput stream does not read from same node",
-            prevDNInfo, currDNInfo);
-      }
+      assertFalse("should not be tracking input stream on open",
+          dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
 
-      assertEquals("InputStream exceeds expected position",
-          firstBlockMark, fin.getPos());
-      // get the second block locations
-      LocatedBlock secondLocatedBlk =
-          fin.locatedBlocks.getLocatedBlocks().get(1);
-      // get the nodeinfo for that block
-      DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
-      DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
-      // stop the datanode in the list of the
-      DataNode deadNode = getdataNodeFromHostName(dfsCluster,
-          deadNodeInfo.getHostName());
-      // Shutdown and wait for datanode to be marked dead
-      DatanodeRegistration reg = InternalDataNodeTestUtils.
-          getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
-      DataNodeProperties stoppedDNProps =
-          dfsCluster.stopDataNode(deadNodeInfo.getName());
+      // still not registered because it hasn't been an hour by the time we 
call this
+      registrationMethod.accept(fin);
+      assertFalse("should not be tracking input stream after first read",
+          dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
 
-      List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
-      assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
-      // get the located blocks
-      LocatedBlocks afterStoppageLocatedBlocks =
-          dfsClient.getLocatedBlocks(fileName, 0, fileLength);
-      // read second block
-      int secondBlockMark =  (int) (1.5 * BLOCK_SIZE);
-      boolean firstIteration = true;
-      if (this.enableBlkExpiration) {
-        // set the time stamps to make sure that we do not refresh locations 
yet
-        fin.setReadTimeStampsForTesting(Time.monotonicNow());
-      }
-      while (fin.getPos() < secondBlockMark) {
-        bytesRead = fin.read(readBuffer);
-        assertTrue("dead node used to read at position: " + fin.getPos(),
-            fin.deadNodesContain(deadNodeInfo));
-        Assert.assertTrue("Unexpected number of read bytes",
-            chunkReadSize >= bytesRead);
-        prevDNInfo = currDNInfo;
-        currDNInfo = fin.getCurrentDatanode();
-        assertNotEquals(deadNodeInfo, currDNInfo);
-        if (firstIteration) {
-          // currDNInfo has to be different unless first block locs is 
different
-          assertFalse("FSInputStream should pick a different DN",
-              firstBlkDNInfos[0].equals(deadNodeInfo)
-                  && prevDNInfo.equals(currDNInfo));
-          firstIteration = false;
-        }
-      }
-      assertEquals("InputStream exceeds expected position",
-          secondBlockMark, fin.getPos());
-      // restart the dead node with the same port
-      assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
-      dfsCluster.waitActive();
-      List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
-      assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
-      // continue reading from block 2 again. We should read from deadNode
-      int thirdBlockMark =  2 * BLOCK_SIZE;
-      firstIteration = true;
-      while (fin.getPos() < thirdBlockMark) {
-        bytesRead = fin.read(readBuffer);
-        if (this.enableBlkExpiration) {
-          assertEquals("node is removed from deadNodes after 1st iteration",
-              firstIteration, fin.deadNodesContain(deadNodeInfo));
-        } else {
-          assertTrue(fin.deadNodesContain(deadNodeInfo));
-        }
-        Assert.assertTrue("Unexpected number of read bytes",
-            chunkReadSize >= bytesRead);
-        prevDNInfo = currDNInfo;
-        currDNInfo = fin.getCurrentDatanode();
-        if (!this.enableBlkExpiration) {
-          assertNotEquals(deadNodeInfo, currDNInfo);
-        }
-        if (firstIteration) {
-          assertEquals(prevDNInfo, currDNInfo);
-          firstIteration = false;
-          if (this.enableBlkExpiration) {
-            // reset the time stamps of located blocks to force cache 
expiration
-            fin.setReadTimeStampsForTesting(
-                Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
-          }
-        }
-      }
-      assertEquals("InputStream exceeds expected position",
-          thirdBlockMark, fin.getPos());
+      // artificially make it have been an hour
+      fin.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - 
(dfsInputLocationsTimeout + 1));
+      registrationMethod.accept(fin);
+      assertEquals("SHOULD be tracking input stream on read after interval, 
only if enabled",
+          enableBlkExpiration, 
dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
     } finally {
-      if (fout != null) {
-        fout.close();
-      }
       if (fin != null) {
         fin.close();
+        
assertFalse(dfsClient.getLocatedBlockRefresher().isInputStreamTracked(fin));
       }
+      fs.delete(filePath, true);
     }
   }
 
-  private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
-      String hostName) {
-    for (DataNode dn : cluster.getDataNodes()) {
-      if (dn.getDatanodeId().getHostName().equals(hostName)) {
-        return dn;
-      }
+  private Path createFile(String fileName) throws IOException {
+    Path path = new Path(fileName);
+    try (FSDataOutputStream fout = fs.create(path, REPLICATION_FACTOR)) {
+      fout.write(new byte[(fileLength)]);
     }
-    return null;
+    return path;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java
new file mode 100644
index 0000000..5d8c479
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLocatedBlocksRefresher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);
+
+  private static final int BLOCK_SIZE = 1024 * 1024;
+  private static final short REPLICATION_FACTOR = (short) 4;
+  private static final String[] RACKS = new String[] {
+      "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
+  private static final int NUM_DATA_NODES = RACKS.length;
+
+  private final int numOfBlocks = 24;
+  private final int fileLength = numOfBlocks * BLOCK_SIZE;
+  private final int dfsClientPrefetchSize = fileLength / 2;
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    cluster = null;
+    conf = new HdfsConfiguration();
+
+    // disable shortcircuit reading
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+    // set replication factor
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
+    // set block size and other sizes
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
+        dfsClientPrefetchSize);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown(true, true);
+    }
+  }
+
+  private void setupTest(long refreshInterval) throws IOException {
+    conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, 
refreshInterval);
+
+    // this is necessary to ensure no caching between runs
+    conf.set("dfs.client.context", UUID.randomUUID().toString());
+
+    cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
+    cluster.waitActive();
+  }
+
+  @Test
+  public void testDisabledOnZeroInterval() throws IOException {
+    setupTest(0);
+    assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher());
+  }
+
+  @Test
+  public void testEnabledOnNonZeroInterval() throws Exception {
+    setupTest(1000);
+    LocatedBlocksRefresher refresher =
+        cluster.getFileSystem().getClient().getLocatedBlockRefresher();
+    assertNotNull(refresher);
+    assertNoMoreRefreshes(refresher);
+  }
+
+  @Test
+  public void testRefreshOnDeadNodes() throws Exception {
+    setupTest(1000);
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSClient client = fs.getClient();
+    LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher();
+
+    String fileName = createTestFile(fs);
+
+    try (DFSInputStream fin = client.open(fileName)) {
+      LocatedBlocks locatedBlocks = fin.locatedBlocks;
+      assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
+          locatedBlocks.locatedBlockCount());
+
+      // should not be tracked yet
+      assertFalse(refresher.isInputStreamTracked(fin));
+
+      // track and verify
+      refresher.addInputStream(fin);
+      assertTrue(refresher.isInputStreamTracked(fin));
+
+      // no refreshes yet, as nothing has happened
+      assertNoMoreRefreshes(refresher);
+      synchronized (fin.infoLock) {
+        assertSame(locatedBlocks, fin.locatedBlocks);
+      }
+
+      stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1);
+
+      // read blocks, which should trigger dead node for the one we stopped
+      int chunkReadSize = BLOCK_SIZE / 4;
+      byte[] readBuffer = new byte[chunkReadSize];
+      fin.read(0, readBuffer, 0, readBuffer.length);
+
+      assertEquals(1, fin.getLocalDeadNodes().size());
+
+      // we should get a refresh now
+      assertRefreshes(refresher, 1);
+
+      // verify that it actually changed things
+      synchronized (fin.infoLock) {
+        assertNotSame(locatedBlocks, fin.locatedBlocks);
+        assertTrue(fin.getLocalDeadNodes().isEmpty());
+      }
+
+      // no more refreshes because everything is happy again
+      assertNoMoreRefreshes(refresher);
+
+      // stop another node, and try to trigger a new deadNode
+      stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2);
+      readBuffer = new byte[chunkReadSize];
+      fin.read(0, readBuffer, 0, readBuffer.length);
+
+      // we should refresh again now, and verify
+      // may actually be more than 1, since the first dead node
+      // may still be listed in the replicas for the bock
+      assertTrue(fin.getLocalDeadNodes().size() > 0);
+
+      assertRefreshes(refresher, 1);
+
+      synchronized (fin.infoLock) {
+        assertNotSame(locatedBlocks, fin.locatedBlocks);
+        assertTrue(fin.getLocalDeadNodes().isEmpty());
+      }
+
+      // de-register, and expect no more refreshes below
+      refresher.removeInputStream(fin);
+    }
+
+    assertNoMoreRefreshes(refresher);
+  }
+
+  private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) {
+    synchronized (fin.infoLock) {
+      int idx = fin.locatedBlocks.findBlock(0);
+      for (int i = 0; i < REPLICATION_FACTOR; i++) {
+        String deadNodeAddr = 
fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr();
+
+        DataNodeProperties dataNodeProperties = 
cluster.stopDataNode(deadNodeAddr);
+        if (dataNodeProperties != null) {
+          List<DataNode> datanodesPostStoppage = cluster.getDataNodes();
+          assertEquals(expectedNodes, datanodesPostStoppage.size());
+          return;
+        }
+      }
+
+      throw new RuntimeException("Could not find a datanode to stop");
+    }
+  }
+
+  private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws 
InterruptedException {
+    long interval = refresher.getInterval();
+    int runCount = refresher.getRunCount();
+    int refreshCount = refresher.getRefreshCount();
+
+    LOG.info("Waiting for at least {} runs, from current {}, expecting no 
refreshes",
+        runCount + 3, runCount);
+    // wait for it to run 3 times, with some buffer
+    awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * 
interval);
+
+    // it should not have refreshed anything, because no DFSInputStreams 
registered anymore
+    assertEquals(refreshCount, refresher.getRefreshCount());
+  }
+
+  private void assertRefreshes(LocatedBlocksRefresher refresher, int 
expectedRefreshes)
+      throws InterruptedException {
+    int runCount = refresher.getRunCount();
+    int refreshCount = refresher.getRefreshCount();
+    int expectedRuns = 3;
+
+    if (expectedRefreshes < 0) {
+      expectedRefreshes = expectedRuns;
+    }
+
+    LOG.info(
+        "Waiting for at least {} runs, from current {}. Expecting {} 
refreshes, from current {}",
+        runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, 
refreshCount
+    );
+
+    // wait for it to run 3 times
+    awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 
10_000);
+
+    // the values may not be identical due to any refreshes that occurred 
before we opened
+    // the DFSInputStream but the difference should be identical since we are 
refreshing
+    // every time
+    assertEquals(expectedRefreshes, refresher.getRefreshCount() - 
refreshCount);
+  }
+
+  private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
+      throws InterruptedException {
+    long now = Time.monotonicNow();
+
+    while(!test.get()) {
+      if (Time.monotonicNow() - now > timeoutMillis) {
+        fail("Timed out waiting for true condition");
+        return;
+      }
+
+      Thread.sleep(50);
+    }
+  }
+
+  private String createTestFile(FileSystem fs) throws IOException {
+    String fileName = "/located_blocks_" + UUID.randomUUID().toString();
+    Path filePath = new Path(fileName);
+    try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
+      fout.write(new byte[(fileLength)]);
+    }
+    fs.deleteOnExit(filePath);
+
+    return fileName;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to