Author: umamahesh Date: Tue Sep 25 01:46:50 2012 New Revision: 1389678 URL: http://svn.apache.org/viewvc?rev=1389678&view=rev Log: HDFS-3701. HDFS may miss the final block when reading a file opened for writing if one of the datanode is dead. Contributed by Uma MAheswara Rao G and nkeywal.
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1389678&r1=1389677&r2=1389678&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 25 01:46:50 2012 @@ -588,6 +588,9 @@ Release 1.1.0 - unreleased MAPREDUCE-4675. Fixed a race condition caused in TestKillSubProcesses caused due to a recent commit. (Bikas Saha via vinodkv) + HDFS-3701. HDFS may miss the final block when reading a file opened for writing + if one of the datanode is dead. (umamahesh and nkeywal via umamahesh) + Release 1.0.4 - Unreleased NEW FEATURES Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1389678&r1=1389677&r2=1389678&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 25 01:46:50 2012 @@ -1945,74 +1945,126 @@ public class DFSClient implements FSCons * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException { - LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); + for (int retries = 3; retries > 0; retries--) { + if (fetchLocatedBlocks()) { + // fetch block success + return; + } else { + // Last block location unavailable. When a cluster restarts, + // DNs may not report immediately. At this time partial block + // locations will not be available with NN for getting the length. + // Lets retry a few times to get the length. + DFSClient.LOG.warn("Last block locations unavailable. " + + "Datanodes might not have reported blocks completely." + + " Will retry for " + retries + " times"); + waitFor(4000); + } + } + throw new IOException("Could not obtain the last block locations."); + } + + private void waitFor(int waitTime) throws InterruptedIOException { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while getting the last block length."); + } + } + + private boolean fetchLocatedBlocks() throws IOException, + FileNotFoundException { + LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, + prefetchSize); if (newInfo == null) { throw new FileNotFoundException("File does not exist: " + src); } - // I think this check is not correct. A file could have been appended to - // between two calls to openInfo(). - if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() && - !newInfo.isUnderConstruction()) { - Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); + if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() + && !newInfo.isUnderConstruction()) { + Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks() + .iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { - if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { + if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } } - updateBlockInfo(newInfo); + boolean isBlkInfoUpdated = updateBlockInfo(newInfo); this.locatedBlocks = newInfo; this.currentNode = null; + return isBlkInfoUpdated; } /** * For files under construction, update the last block size based * on the length of the block from the datanode. */ - private void updateBlockInfo(LocatedBlocks newInfo) { + private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException { if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction() || !(newInfo.locatedBlockCount() > 0)) { - return; + return true; } LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1); boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo .getFileLength()); - if (!lastBlockInFile || last.getLocations().length <= 0) { - return; + if (!lastBlockInFile) { + return true; + } + + if (last.getLocations().length == 0) { + return false; } + ClientDatanodeProtocol primary = null; - DatanodeInfo primaryNode = last.getLocations()[0]; - try { - primary = createClientDatanodeProtocolProxy(primaryNode, conf, - last.getBlock(), last.getBlockToken(), socketTimeout, - connectToDnViaHostname); - Block newBlock = primary.getBlockInfo(last.getBlock()); - long newBlockSize = newBlock.getNumBytes(); - long delta = newBlockSize - last.getBlockSize(); - // if the size of the block on the datanode is different - // from what the NN knows about, the datanode wins! - last.getBlock().setNumBytes(newBlockSize); - long newlength = newInfo.getFileLength() + delta; - newInfo.setFileLength(newlength); - LOG.debug("DFSClient setting last block " + last + " to length " - + newBlockSize + " filesize is now " + newInfo.getFileLength()); - } catch (IOException e) { - if (e.getMessage().startsWith( - "java.io.IOException: java.lang.NoSuchMethodException: " - + "org.apache.hadoop.hdfs.protocol" - + ".ClientDatanodeProtocol.getBlockInfo")) { - // We're talking to a server that doesn't implement HDFS-200. - serverSupportsHdfs200 = false; - } else { - LOG.debug("DFSClient file " + src - + " is being concurrently append to" + " but datanode " - + primaryNode.getHostName() + " probably does not have block " - + last.getBlock()); + Block newBlock = null; + for (int i = 0; i < last.getLocations().length && newBlock == null; i++) { + DatanodeInfo datanode = last.getLocations()[i]; + try { + primary = createClientDatanodeProtocolProxy(datanode, conf, last + .getBlock(), last.getBlockToken(), socketTimeout, + connectToDnViaHostname); + newBlock = primary.getBlockInfo(last.getBlock()); + } catch (IOException e) { + if (e.getMessage().startsWith( + "java.io.IOException: java.lang.NoSuchMethodException: " + + "org.apache.hadoop.hdfs.protocol" + + ".ClientDatanodeProtocol.getBlockInfo")) { + // We're talking to a server that doesn't implement HDFS-200. + serverSupportsHdfs200 = false; + } else { + LOG.info("Failed to get block info from " + + datanode.getHostName() + " probably does not have block " + + last.getBlock(), e); + } + } finally { + if (primary != null) { + RPC.stopProxy(primary); + } } } + + if (newBlock == null) { + if (!serverSupportsHdfs200) { + return true; + } + throw new IOException( + "Failed to get block info from any of the DN in pipeline: " + + Arrays.toString(last.getLocations())); + } + + long newBlockSize = newBlock.getNumBytes(); + long delta = newBlockSize - last.getBlockSize(); + // if the size of the block on the datanode is different + // from what the NN knows about, the datanode wins! + last.getBlock().setNumBytes(newBlockSize); + long newlength = newInfo.getFileLength() + delta; + newInfo.setFileLength(newlength); + LOG.debug("DFSClient setting last block " + last + " to length " + + newBlockSize + " filesize is now " + newInfo.getFileLength()); + return true; } public synchronized long getFileLength() { Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java?rev=1389678&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java Tue Sep 25 01:46:50 2012 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; +import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; +import org.junit.Assert; +import org.junit.Test; + +/** Test the fileLength on cluster restarts */ +public class TestFileLengthOnClusterRestart { + /** + * Tests the fileLength when we sync the file and restart the cluster and + * Datanodes not report to Namenode yet. + */ + @Test(timeout = 60000) + public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister() + throws Exception { + final Configuration conf = new Configuration(); + // create cluster + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512); + + final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + DFSDataInputStream in = null; + FSDataOutputStream out = null; + DistributedFileSystem dfs = null; + try { + Path path = new Path(MiniDFSCluster.getBaseDir().getPath(), "test"); + dfs = (DistributedFileSystem) cluster.getFileSystem(); + out = dfs.create(path); + int fileLength = 1030; + out.write(new byte[fileLength]); + out.sync(); + cluster.restartNameNode(); + cluster.waitActive(); + in = (DFSDataInputStream) dfs.open(path, 1024); + // Verify the length when we just restart NN. DNs will register + // immediately. + Assert.assertEquals(fileLength, in.getVisibleLength()); + cluster.shutdownDataNodes(); + cluster.restartNameNode(false); + // This is just for ensuring NN started. + verifyNNIsInSafeMode(dfs); + + try { + in = (DFSDataInputStream) dfs.open(path); + Assert.fail("Expected IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getLocalizedMessage().indexOf( + "Name node is in safe mode") >= 0); + } + + } finally { + if (null != in) { + in.close(); + } + if (null != dfs) { + dfs.dfs.clientRunning = false; + } + cluster.shutdown(); + } + } + + private void verifyNNIsInSafeMode(DistributedFileSystem dfs) + throws IOException { + while (true) { + try { + if (dfs.dfs.namenode.setSafeMode(SafeModeAction.SAFEMODE_GET)) { + return; + } else { + throw new IOException("Expected to be in SafeMode"); + } + } catch (IOException e) { + // NN might not started completely Ignore + } + } + } +}