Author: umamahesh
Date: Tue Sep 25 01:46:50 2012
New Revision: 1389678

URL: http://svn.apache.org/viewvc?rev=1389678&view=rev
Log:
HDFS-3701. HDFS may miss the final block when reading a file opened for writing 
if one of the datanode is dead. Contributed by Uma MAheswara Rao G and nkeywal.


Added:
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1389678&r1=1389677&r2=1389678&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Sep 25 01:46:50 2012
@@ -588,6 +588,9 @@ Release 1.1.0 - unreleased
     MAPREDUCE-4675. Fixed a race condition caused in TestKillSubProcesses 
caused
     due to a recent commit. (Bikas Saha via vinodkv)
 
+    HDFS-3701. HDFS may miss the final block when reading a file opened for 
writing
+    if one of the datanode is dead. (umamahesh and nkeywal via umamahesh)
+
 Release 1.0.4 - Unreleased
 
   NEW FEATURES

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1389678&r1=1389677&r2=1389678&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 
(original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 
Tue Sep 25 01:46:50 2012
@@ -1945,74 +1945,126 @@ public class DFSClient implements FSCons
      * Grab the open-file info from namenode
      */
     synchronized void openInfo() throws IOException {
-      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, 
prefetchSize);
+      for (int retries = 3; retries > 0; retries--) {
+        if (fetchLocatedBlocks()) {
+          // fetch block success
+          return;
+        } else {
+          // Last block location unavailable. When a cluster restarts,
+          // DNs may not report immediately. At this time partial block
+          // locations will not be available with NN for getting the length.
+          // Lets retry a few times to get the length.
+          DFSClient.LOG.warn("Last block locations unavailable. "
+              + "Datanodes might not have reported blocks completely."
+              + " Will retry for " + retries + " times");
+          waitFor(4000);
+        }
+      }
+      throw new IOException("Could not obtain the last block locations.");
+    }
+    
+    private void waitFor(int waitTime) throws InterruptedIOException {
+      try {
+        Thread.sleep(waitTime);
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException(
+            "Interrupted while getting the last block length.");
+      }
+    }
+
+    private boolean fetchLocatedBlocks() throws IOException,
+        FileNotFoundException {
+      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
+          prefetchSize);
       if (newInfo == null) {
         throw new FileNotFoundException("File does not exist: " + src);
       }
 
-      // I think this check is not correct. A file could have been appended to
-      // between two calls to openInfo().
-      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
-          !newInfo.isUnderConstruction()) {
-        Iterator<LocatedBlock> oldIter = 
locatedBlocks.getLocatedBlocks().iterator();
+      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
+          && !newInfo.isUnderConstruction()) {
+        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
+            .iterator();
         Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
         while (oldIter.hasNext() && newIter.hasNext()) {
-          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+          if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
             throw new IOException("Blocklist for " + src + " has changed!");
           }
         }
       }
-      updateBlockInfo(newInfo);
+      boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
       this.locatedBlocks = newInfo;
       this.currentNode = null;
+      return isBlkInfoUpdated;
     }
     
     /** 
      * For files under construction, update the last block size based
      * on the length of the block from the datanode.
      */
-    private void updateBlockInfo(LocatedBlocks newInfo) {
+    private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
       if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
           || !(newInfo.locatedBlockCount() > 0)) {
-        return;
+        return true;
       }
 
       LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
       boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() 
== newInfo
           .getFileLength());
-      if (!lastBlockInFile || last.getLocations().length <= 0) {
-        return;
+      if (!lastBlockInFile) {
+        return true;
+      }
+      
+      if (last.getLocations().length == 0) {
+        return false;
       }
+      
       ClientDatanodeProtocol primary = null;
-      DatanodeInfo primaryNode = last.getLocations()[0];
-      try {
-        primary = createClientDatanodeProtocolProxy(primaryNode, conf,
-            last.getBlock(), last.getBlockToken(), socketTimeout,
-            connectToDnViaHostname);
-        Block newBlock = primary.getBlockInfo(last.getBlock());
-        long newBlockSize = newBlock.getNumBytes();
-        long delta = newBlockSize - last.getBlockSize();
-        // if the size of the block on the datanode is different
-        // from what the NN knows about, the datanode wins!
-        last.getBlock().setNumBytes(newBlockSize);
-        long newlength = newInfo.getFileLength() + delta;
-        newInfo.setFileLength(newlength);
-        LOG.debug("DFSClient setting last block " + last + " to length "
-            + newBlockSize + " filesize is now " + newInfo.getFileLength());
-      } catch (IOException e) {
-        if (e.getMessage().startsWith(
-            "java.io.IOException: java.lang.NoSuchMethodException: "
-                + "org.apache.hadoop.hdfs.protocol"
-                + ".ClientDatanodeProtocol.getBlockInfo")) {
-          // We're talking to a server that doesn't implement HDFS-200.
-          serverSupportsHdfs200 = false;
-        } else {
-          LOG.debug("DFSClient file " + src
-              + " is being concurrently append to" + " but datanode "
-              + primaryNode.getHostName() + " probably does not have block "
-              + last.getBlock());
+      Block newBlock = null;
+      for (int i = 0; i < last.getLocations().length && newBlock == null; i++) 
{
+        DatanodeInfo datanode = last.getLocations()[i];
+        try {
+          primary = createClientDatanodeProtocolProxy(datanode, conf, last
+              .getBlock(), last.getBlockToken(), socketTimeout,
+              connectToDnViaHostname);
+          newBlock = primary.getBlockInfo(last.getBlock());
+        } catch (IOException e) {
+          if (e.getMessage().startsWith(
+              "java.io.IOException: java.lang.NoSuchMethodException: "
+                  + "org.apache.hadoop.hdfs.protocol"
+                  + ".ClientDatanodeProtocol.getBlockInfo")) {
+            // We're talking to a server that doesn't implement HDFS-200.
+            serverSupportsHdfs200 = false;
+          } else {
+            LOG.info("Failed to get block info from "
+                + datanode.getHostName() + " probably does not have block "
+                + last.getBlock(), e);
+          }
+        } finally {
+          if (primary != null) {
+            RPC.stopProxy(primary);
+          }
         }
       }
+      
+      if (newBlock == null) {
+        if (!serverSupportsHdfs200) {
+          return true;
+        }
+        throw new IOException(
+            "Failed to get block info from any of the DN in pipeline: "
+                    + Arrays.toString(last.getLocations()));
+      }
+      
+      long newBlockSize = newBlock.getNumBytes();
+      long delta = newBlockSize - last.getBlockSize();
+      // if the size of the block on the datanode is different
+      // from what the NN knows about, the datanode wins!
+      last.getBlock().setNumBytes(newBlockSize);
+      long newlength = newInfo.getFileLength() + delta;
+      newInfo.setFileLength(newlength);
+      LOG.debug("DFSClient setting last block " + last + " to length "
+          + newBlockSize + " filesize is now " + newInfo.getFileLength());
+      return true;
     }
     
     public synchronized long getFileLength() {

Added: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java?rev=1389678&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
 Tue Sep 25 01:46:50 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file                
                                           
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test the fileLength on cluster restarts */
+public class TestFileLengthOnClusterRestart {
+  /**
+   * Tests the fileLength when we sync the file and restart the cluster and
+   * Datanodes not report to Namenode yet.
+   */
+  @Test(timeout = 60000)
+  public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister()
+      throws Exception {
+    final Configuration conf = new Configuration();
+    // create cluster
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    DFSDataInputStream in = null;
+    FSDataOutputStream out = null;
+    DistributedFileSystem dfs = null;
+    try {
+      Path path = new Path(MiniDFSCluster.getBaseDir().getPath(), "test");
+      dfs = (DistributedFileSystem) cluster.getFileSystem();
+      out = dfs.create(path);
+      int fileLength = 1030;
+      out.write(new byte[fileLength]);
+      out.sync();
+      cluster.restartNameNode();
+      cluster.waitActive();
+      in = (DFSDataInputStream) dfs.open(path, 1024);
+      // Verify the length when we just restart NN. DNs will register
+      // immediately.
+      Assert.assertEquals(fileLength, in.getVisibleLength());
+      cluster.shutdownDataNodes();
+      cluster.restartNameNode(false);
+      // This is just for ensuring NN started.
+      verifyNNIsInSafeMode(dfs);
+
+      try {
+        in = (DFSDataInputStream) dfs.open(path);
+        Assert.fail("Expected IOException");
+      } catch (IOException e) {
+        Assert.assertTrue(e.getLocalizedMessage().indexOf(
+            "Name node is in safe mode") >= 0);
+      }
+
+    } finally {
+      if (null != in) {
+        in.close();
+      }
+      if (null != dfs) {
+        dfs.dfs.clientRunning = false;
+      }
+      cluster.shutdown();
+    }
+  }
+
+  private void verifyNNIsInSafeMode(DistributedFileSystem dfs)
+      throws IOException {
+    while (true) {
+      try {
+        if (dfs.dfs.namenode.setSafeMode(SafeModeAction.SAFEMODE_GET)) {
+          return;
+        } else {
+          throw new IOException("Expected to be in SafeMode");
+        }
+      } catch (IOException e) {
+        // NN might not started completely Ignore
+      }
+    }
+  }
+}


Reply via email to