This is an automated email from the ASF dual-hosted git repository. kihwal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new d10f77e HDFS-15119. Allow expiration of cached locations in DFSInputStream. Contributed by Ahmed Hussein. d10f77e is described below commit d10f77e3c91225f86ed9c0f0e6a9adf2e1434674 Author: Kihwal Lee <kih...@apache.org> AuthorDate: Fri Jan 24 09:15:27 2020 -0600 HDFS-15119. Allow expiration of cached locations in DFSInputStream. Contributed by Ahmed Hussein. --- .../java/org/apache/hadoop/hdfs/DFSClient.java | 4 + .../org/apache/hadoop/hdfs/DFSInputStream.java | 92 ++++++- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 5 + .../hadoop/hdfs/client/impl/DfsClientConf.java | 15 ++ .../src/main/resources/hdfs-default.xml | 8 + .../hdfs/TestDFSInputStreamBlockLocations.java | 290 +++++++++++++++++++++ 6 files changed, 408 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 9bb28f1..e0eaa19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -844,6 +844,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, namenode.reportBadBlocks(blocks); } + public long getRefreshReadBlkLocationsInterval() { + return dfsClientConf.getRefreshReadBlockLocationsMS(); + } + public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException { return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 9827534..af9891a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -86,6 +86,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -135,6 +136,10 @@ public class DFSInputStream extends FSInputStream // (it's OK to acquire this lock when the lock on <this> is held) protected final Object infoLock = new Object(); + // refresh locatedBlocks periodically + private final long refreshReadBlockIntervals; + /** timeStamp of the last time a block location was refreshed. */ + private long locatedBlocksTimeStamp; /** * Track the ByteBuffers that we have handed out to readers. * @@ -151,6 +156,10 @@ public class DFSInputStream extends FSInputStream return extendedReadBuffers; } + private boolean isPeriodicRefreshEnabled() { + return (refreshReadBlockIntervals > 0L); + } + /** * This variable tracks the number of failures since the start of the * most recent user-facing operation. That is to say, it should be reset @@ -164,7 +173,7 @@ public class DFSInputStream extends FSInputStream */ protected int failures = 0; - /* XXX Use of CocurrentHashMap is temp fix. Need to fix + /* XXX Use of ConcurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = new ConcurrentHashMap<>(); @@ -194,6 +203,9 @@ public class DFSInputStream extends FSInputStream DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; + this.refreshReadBlockIntervals = + this.dfsClient.getRefreshReadBlkLocationsInterval(); + setLocatedBlocksTimeStamp(); this.verifyChecksum = verifyChecksum; this.src = src; synchronized (infoLock) { @@ -204,10 +216,28 @@ public class DFSInputStream extends FSInputStream } @VisibleForTesting - public long getlastBlockBeingWrittenLengthForTesting() { + long getlastBlockBeingWrittenLengthForTesting() { return lastBlockBeingWrittenLength; } + @VisibleForTesting + boolean deadNodesContain(DatanodeInfo nodeInfo) { + return deadNodes.containsKey(nodeInfo); + } + + @VisibleForTesting + void setReadTimeStampsForTesting(long timeStamp) { + setLocatedBlocksTimeStamp(timeStamp); + } + + private void setLocatedBlocksTimeStamp() { + setLocatedBlocksTimeStamp(Time.monotonicNow()); + } + + private void setLocatedBlocksTimeStamp(long timeStamp) { + this.locatedBlocksTimeStamp = timeStamp; + } + /** * Grab the open-file info from namenode * @param refreshLocatedBlocks whether to re-fetch locatedblocks @@ -252,6 +282,48 @@ public class DFSInputStream extends FSInputStream } } + /** + * Checks whether the block locations timestamps have expired. + * In the case of expired timestamp: + * - clear list of deadNodes + * - call openInfo(true) which will re-fetch locatedblocks + * - update locatedBlocksTimeStamp + * @return true when the expiration feature is enabled and locatedblocks + * timestamp has expired. + * @throws IOException + */ + private boolean isLocatedBlocksExpired() { + if (!isPeriodicRefreshEnabled()) { + return false; + } + long now = Time.monotonicNow(); + long elapsed = now - locatedBlocksTimeStamp; + if (elapsed < refreshReadBlockIntervals) { + return false; + } + return true; + } + + /** + * Update the block locations timestamps if they have expired. + * In the case of expired timestamp: + * - clear list of deadNodes + * - call openInfo(true) which will re-fetch locatedblocks + * - update locatedBlocksTimeStamp + * @return true when the locatedblocks list is re-fetched from the namenode. + * @throws IOException + */ + private boolean updateBlockLocationsStamp() throws IOException { + if (!isLocatedBlocksExpired()) { + return false; + } + // clear dead nodes + deadNodes.clear(); + openInfo(true); + setLocatedBlocksTimeStamp(); + return true; + } + private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException { LocatedBlocks newInfo = locatedBlocks; @@ -264,7 +336,8 @@ public class DFSInputStream extends FSInputStream } if (locatedBlocks != null) { - Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); + Iterator<LocatedBlock> oldIter = + locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { @@ -468,6 +541,7 @@ public class DFSInputStream extends FSInputStream private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) throws IOException { synchronized(infoLock) { + updateBlockLocationsStamp(); int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); @@ -567,7 +641,6 @@ public class DFSInputStream extends FSInputStream if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } - // Will be getting a new BlockReader. closeCurrentBlockReaders(); @@ -581,9 +654,13 @@ public class DFSInputStream extends FSInputStream boolean connectFailedOnce = false; while (true) { + // Re-fetch the locatedBlocks from NN if the timestamp has expired. + updateBlockLocationsStamp(); + // // Compute desired block // + LocatedBlock targetBlock = getBlockAt(target); // update current position @@ -797,7 +874,10 @@ public class DFSInputStream extends FSInputStream try { // currentNode can be left as null if previous read had a checksum // error on the same block. See HDFS-3067 - if (pos > blockEnd || currentNode == null) { + // currentNode needs to be updated if the blockLocations timestamp has + // expired. + if (pos > blockEnd || currentNode == null + || updateBlockLocationsStamp()) { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); @@ -1538,7 +1618,7 @@ public class DFSInputStream extends FSInputStream * the current datanode and might connect to the same node. */ private boolean seekToBlockSource(long targetPos) - throws IOException { + throws IOException { currentNode = blockSeekTo(targetPos); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index fec3958..38f0016 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -191,6 +191,11 @@ public interface HdfsClientConfigKeys { long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT = 300; // 300ms + // refreshing LocatedBlocks period. A value of 0 disables the feature. + String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY = + "dfs.client.refresh.read-block-locations.ms"; + long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L; + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 804375c..d6194f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -136,6 +136,9 @@ public class DfsClientConf { private final long datanodeRestartTimeout; private final long slowIoWarningThresholdMs; + /** wait time window before refreshing blocklocation for inputstream. */ + private final long refreshReadBlockLocationsMS; + private final ShortCircuitConf shortCircuitConf; private final long hedgedReadThresholdMillis; @@ -257,6 +260,11 @@ public class DfsClientConf { DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + refreshReadBlockLocationsMS = conf.getLong( + HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, + HdfsClientConfigKeys. + DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT); + shortCircuitConf = new ShortCircuitConf(conf); hedgedReadThresholdMillis = conf.getLong( @@ -619,6 +627,13 @@ public class DfsClientConf { } /** + * @return the replicaAccessorBuilderClasses + */ + public long getRefreshReadBlockLocationsMS() { + return refreshReadBlockLocationsMS; + } + + /** * @return the shortCircuitConf */ public ShortCircuitConf getShortCircuitConf() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 6c54d85..344f7ea 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3114,6 +3114,14 @@ </description> </property> + <property> + <name>dfs.client.refresh.read-block-locations.ms</name> + <value>0</value> + <description> + Refreshing LocatedBlocks period. A value of 0 disables the feature. + </description> + </property> + <property> <name>dfs.namenode.lease-recheck-interval-ms</name> <value>2000</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java new file mode 100644 index 0000000..9fed914 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test the caches expiration of the block locations. + */ +@RunWith(Parameterized.class) +public class TestDFSInputStreamBlockLocations { + private static final int BLOCK_SIZE = 1024 * 1024; + private static final String[] RACKS = new String[] { + "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" }; + private static final int NUM_DATA_NODES = RACKS.length; + private static final short REPLICATION_FACTOR = (short) 4; + private final int staleInterval = 8000; + private final int numOfBlocks = 24; + private final int fileLength = numOfBlocks * BLOCK_SIZE; + private final int dfsClientPrefetchSize = fileLength / 2; + // locatedBlocks expiration set to 1 hour + private final long dfsInputLocationsTimeout = 60 * 60 * 1000L; + + private HdfsConfiguration conf; + private MiniDFSCluster dfsCluster; + private DFSClient dfsClient; + private DistributedFileSystem fs; + private Path filePath; + private boolean enableBlkExpiration; + + @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})") + public static Collection<Object[]> getTestParameters() { + return Arrays.asList(new Object[][] { + {Boolean.TRUE}, + {Boolean.FALSE} + }); + } + + public TestDFSInputStreamBlockLocations(Boolean enableExpiration) { + enableBlkExpiration = enableExpiration; + } + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + // set the heartbeat intervals and stale considerations + conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, + staleInterval); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + staleInterval / 2); + // disable shortcircuit reading + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); + // set replication factor + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR); + // set block size and other sizes + conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, + dfsClientPrefetchSize); + if (enableBlkExpiration) { + // set the refresh locations for every dfsInputLocationsTimeout + conf.setLong( + HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, + dfsInputLocationsTimeout); + } + // start the cluster and create a DFSClient + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATA_NODES).racks(RACKS).build(); + dfsCluster.waitActive(); + assertEquals(NUM_DATA_NODES, dfsCluster.getDataNodes().size()); + InetSocketAddress addr = new InetSocketAddress("localhost", + dfsCluster.getNameNodePort()); + dfsClient = new DFSClient(addr, conf); + fs = dfsCluster.getFileSystem(); + } + + @After + public void teardown() throws IOException { + if (dfsClient != null) { + dfsClient.close(); + dfsClient = null; + } + if (fs != null) { + fs.deleteOnExit(filePath); + fs.close(); + fs = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + @Test + public void testRead() throws Exception { + final String fileName = "/test_cache_locations"; + filePath = new Path(fileName); + DFSInputStream fin = null; + FSDataOutputStream fout = null; + try { + // create a file and write for testing + fout = fs.create(filePath, REPLICATION_FACTOR); + fout.write(new byte[(fileLength)]); + // finalize the file by closing the output stream + fout.close(); + fout = null; + // get the located blocks + LocatedBlocks referenceLocatedBlocks = + dfsClient.getLocatedBlocks(fileName, 0, fileLength); + assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount()); + String poolId = dfsCluster.getNamesystem().getBlockPoolId(); + fin = dfsClient.open(fileName); + // get the located blocks from fin + LocatedBlocks finLocatedBlocks = fin.locatedBlocks; + assertEquals(dfsClientPrefetchSize / BLOCK_SIZE, + finLocatedBlocks.locatedBlockCount()); + final int chunkReadSize = BLOCK_SIZE / 4; + byte[] readBuffer = new byte[chunkReadSize]; + // read the first block + DatanodeInfo prevDNInfo = null; + DatanodeInfo currDNInfo = null; + int bytesRead = 0; + int firstBlockMark = BLOCK_SIZE; + // get the second block locations + LocatedBlock firstLocatedBlk = + fin.locatedBlocks.getLocatedBlocks().get(0); + DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations(); + while (fin.getPos() < firstBlockMark) { + bytesRead = fin.read(readBuffer); + Assert.assertTrue("Unexpected number of read bytes", + chunkReadSize >= bytesRead); + if (currDNInfo == null) { + currDNInfo = fin.getCurrentDatanode(); + assertNotNull("current FIS datanode is null", currDNInfo); + continue; + } + prevDNInfo = currDNInfo; + currDNInfo = fin.getCurrentDatanode(); + assertEquals("the DFSInput stream does not read from same node", + prevDNInfo, currDNInfo); + } + + assertEquals("InputStream exceeds expected position", + firstBlockMark, fin.getPos()); + // get the second block locations + LocatedBlock secondLocatedBlk = + fin.locatedBlocks.getLocatedBlocks().get(1); + // get the nodeinfo for that block + DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations(); + DatanodeInfo deadNodeInfo = secondBlkDNInfos[0]; + // stop the datanode in the list of the + DataNode deadNode = getdataNodeFromHostName(dfsCluster, + deadNodeInfo.getHostName()); + // Shutdown and wait for datanode to be marked dead + DatanodeRegistration reg = InternalDataNodeTestUtils. + getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId); + DataNodeProperties stoppedDNProps = + dfsCluster.stopDataNode(deadNodeInfo.getName()); + + List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes(); + assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size()); + // get the located blocks + LocatedBlocks afterStoppageLocatedBlocks = + dfsClient.getLocatedBlocks(fileName, 0, fileLength); + // read second block + int secondBlockMark = (int) (1.5 * BLOCK_SIZE); + boolean firstIteration = true; + if (this.enableBlkExpiration) { + // set the time stamps to make sure that we do not refresh locations yet + fin.setReadTimeStampsForTesting(Time.monotonicNow()); + } + while (fin.getPos() < secondBlockMark) { + bytesRead = fin.read(readBuffer); + assertTrue("dead node used to read at position: " + fin.getPos(), + fin.deadNodesContain(deadNodeInfo)); + Assert.assertTrue("Unexpected number of read bytes", + chunkReadSize >= bytesRead); + prevDNInfo = currDNInfo; + currDNInfo = fin.getCurrentDatanode(); + assertNotEquals(deadNodeInfo, currDNInfo); + if (firstIteration) { + // currDNInfo has to be different unless first block locs is different + assertFalse("FSInputStream should pick a different DN", + firstBlkDNInfos[0].equals(deadNodeInfo) + && prevDNInfo.equals(currDNInfo)); + firstIteration = false; + } + } + assertEquals("InputStream exceeds expected position", + secondBlockMark, fin.getPos()); + // restart the dead node with the same port + assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true)); + dfsCluster.waitActive(); + List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes(); + assertEquals(NUM_DATA_NODES, datanodesPostRestart.size()); + // continue reading from block 2 again. We should read from deadNode + int thirdBlockMark = 2 * BLOCK_SIZE; + firstIteration = true; + while (fin.getPos() < thirdBlockMark) { + bytesRead = fin.read(readBuffer); + if (this.enableBlkExpiration) { + assertEquals("node is removed from deadNodes after 1st iteration", + firstIteration, fin.deadNodesContain(deadNodeInfo)); + } else { + assertTrue(fin.deadNodesContain(deadNodeInfo)); + } + Assert.assertTrue("Unexpected number of read bytes", + chunkReadSize >= bytesRead); + prevDNInfo = currDNInfo; + currDNInfo = fin.getCurrentDatanode(); + if (!this.enableBlkExpiration) { + assertNotEquals(deadNodeInfo, currDNInfo); + } + if (firstIteration) { + assertEquals(prevDNInfo, currDNInfo); + firstIteration = false; + if (this.enableBlkExpiration) { + // reset the time stamps of located blocks to force cache expiration + fin.setReadTimeStampsForTesting( + Time.monotonicNow() - (dfsInputLocationsTimeout + 1)); + } + } + } + assertEquals("InputStream exceeds expected position", + thirdBlockMark, fin.getPos()); + } finally { + if (fout != null) { + fout.close(); + } + if (fin != null) { + fin.close(); + } + } + } + + private DataNode getdataNodeFromHostName(MiniDFSCluster cluster, + String hostName) { + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeId().getHostName().equals(hostName)) { + return dn; + } + } + return null; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org