Author: hairong
Date: Fri Jan  7 20:11:38 2011
New Revision: 1056483

URL: http://svn.apache.org/viewvc?rev=1056483&view=rev
Log:
HDFS-1555. Disallow pipeline recovery if a file is already being lease 
recovered. Contributed by Hairong Kuang.

Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Jan  7 20:11:38 
2011
@@ -91,6 +91,9 @@ Release 0.20-append - Unreleased
     HDFS-724.  Use a bidirectional heartbeat to detect stuck
     pipeline. (hairong)
 
+    HDFS-1555. Disallow pipelien recovery if a file is already being
+    lease recovered. (hairong)
+
 Release 0.20.3 - Unreleased
 
   NEW FEATURES

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
 Fri Jan  7 20:11:38 2011
@@ -180,7 +180,8 @@ public class DistributedFileSystem exten
   }
 
   /** 
-   * Trigger the lease reovery of a file
+   * Start the lease reovery of a file
+   *
    * @param f a file
    * @throws IOException if an error occurs
    */

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
 Fri Jan  7 20:11:38 2011
@@ -124,8 +124,9 @@ public interface ClientProtocol extends 
   public LocatedBlock append(String src, String clientName) throws IOException;
   
   /**
-   * Trigger lease recovery to happen
-   * @param src path of the file to trigger lease recovery
+   * Start lease recovery
+   * 
+   * @param src path of the file to start lease recovery
    * @param clientName name of the current client
    * @throws IOException
    */

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 Fri Jan  7 20:11:38 2011
@@ -1574,7 +1574,7 @@ public class DataNode extends Configured
 
     List<DatanodeID> successList = new ArrayList<DatanodeID>();
 
-    long generationstamp = namenode.nextGenerationStamp(block);
+    long generationstamp = namenode.nextGenerationStamp(block, closeFile);
     Block newblock = new Block(block.getBlockId(), block.getNumBytes(), 
generationstamp);
 
     for(BlockRecord r : syncList) {

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Fri Jan  7 20:11:38 2011
@@ -893,6 +893,23 @@ class FSDirectory implements FSConstants
     return fullPathName.toString();
   }
   
+    /** Return the full path name of the specified inode */
+    static String getFullPathName(INode inode) {
+      // calculate the depth of this inode from root
+      int depth = 0;
+      for (INode i = inode; i != null; i = i.parent) {
+        depth++;
+      }
+      INode[] inodes = new INode[depth];
+      
+      // fill up the inodes in the path from this inode to root
+      for (int i = 0; i < depth; i++) {
+        inodes[depth-i-1] = inode;
+        inode = inode.parent;
+      }
+      return getFullPathName(inodes, depth-1);
+   }
+                                                          
   /**
    * Create a directory 
    * If ancestor directories do not exist, automatically create them.

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Fri Jan  7 20:11:38 2011
@@ -1051,7 +1051,7 @@ public class FSNamesystem implements FSC
 
     try {
       INode myFile = dir.getFileINode(src);
-      recoverLeaseInternal(myFile, src, holder, clientMachine);
+      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
       try {
         verifyReplication(src, replication, clientMachine);
@@ -1126,11 +1126,11 @@ public class FSNamesystem implements FSC
   }
 
   /**
-   * Trigger to recover lease;
-   * When the method returns successfully, the lease has been recovered and
-   * the file is closed.
+   * Recover lease;
+   * Immediately revoke the lease of the current lease holder and start lease
+   * recovery so that the file can be forced to be closed.
    * 
-   * @param src the path of the file to trigger release
+   * @param src the path of the file to start lease recovery
    * @param holder the lease holder's name
    * @param clientMachine the client machine's name
    * @throws IOException
@@ -1154,11 +1154,11 @@ public class FSNamesystem implements FSC
       checkPathAccess(src, FsAction.WRITE);
     }
 
-    recoverLeaseInternal(inode, src, holder, clientMachine);
+    recoverLeaseInternal(inode, src, holder, clientMachine, true);
   }
   
   private void recoverLeaseInternal(INode fileInode, 
-      String src, String holder, String clientMachine)
+      String src, String holder, String clientMachine, boolean force)
   throws IOException {
     if (fileInode != null && fileInode.isUnderConstruction()) {
       INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) 
fileInode;
@@ -1171,7 +1171,7 @@ public class FSNamesystem implements FSC
       // We found the lease for this file. And surprisingly the original
       // holder is trying to recreate this file. This should never occur.
       //
-      if (lease != null) {
+      if (!force && lease != null) {
         Lease leaseFile = leaseManager.getLeaseByPath(src);
         if (leaseFile != null && leaseFile.equals(lease)) { 
           throw new AlreadyBeingCreatedException(
@@ -1190,21 +1190,29 @@ public class FSNamesystem implements FSC
                     " on client " + clientMachine + 
                     " because pendingCreates is non-null but no leases 
found.");
       }
-      //
-      // If the original holder has not renewed in the last SOFTLIMIT 
-      // period, then start lease recovery.
-      //
-      if (lease.expiredSoftLimit()) {
-        LOG.info("startFile: recover lease " + lease + ", src=" + src +
+      if (force) {
+        // close now: no need to wait for soft lease expiration and 
+        // close only the file src
+        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
                  " from client " + pendingFile.clientName);
-        internalReleaseLease(lease, src);
+        internalReleaseLeaseOne(lease, src);
+      } else {
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then start lease recovery.
+        //
+        if (lease.expiredSoftLimit()) {
+          LOG.info("startFile: recover lease " + lease + ", src=" + src +
+              " from client " + pendingFile.clientName);
+          internalReleaseLease(lease, src);
+        }
+        throw new AlreadyBeingCreatedException(
+            "failed to create file " + src + " for " + holder +
+            " on client " + clientMachine + 
+            ", because this file is already being created by " +
+            pendingFile.getClientName() + 
+            " on " + pendingFile.getClientMachine());
       }
-      throw new AlreadyBeingCreatedException(
-                    "failed to create file " + src + " for " + holder +
-                    " on client " + clientMachine + 
-                    ", because this file is already being created by " +
-                    pendingFile.getClientName() + 
-                    " on " + pendingFile.getClientMachine());
     }
 
   }
@@ -4848,8 +4856,12 @@ public class FSNamesystem implements FSC
   /**
    * Verifies that the block is associated with a file that has a lease.
    * Increments, logs and then returns the stamp
-   */
-  synchronized long nextGenerationStampForBlock(Block block) throws 
IOException {
+   *
+   * @param block block
+   * @param fromNN if it is for lease recovery initiated by NameNode
+   * @return a new generation stamp
+   */  
+  synchronized long nextGenerationStampForBlock(Block block, boolean fromNN) 
throws IOException {
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot get nextGenStamp for " + block, 
safeMode);
     }
@@ -4865,6 +4877,15 @@ public class FSNamesystem implements FSC
       LOG.info(msg);
       throw new IOException(msg);
     }
+    // Disallow client-initiated recovery once
+    // NameNode initiated lease recovery starts
+    if (!fromNN && HdfsConstants.NN_RECOVERY_LEASEHOLDER.equals(
+        
leaseManager.getLeaseByPath(FSDirectory.getFullPathName(fileINode)).getHolder()))
 {
+      String msg = block +
+        "is being recovered by NameNode, ignoring the request from a client";
+      LOG.info(msg);
+      throw new IOException(msg);
+    }
     if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
       String msg = block + " is already being recovered, ignoring this 
request.";
       LOG.info(msg);

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
 Fri Jan  7 20:11:38 2011
@@ -201,6 +201,12 @@ public class LeaseManager {
       this.holder = holder;
       renew();
     }
+
+    /** Get the holder of the lease */
+    public String getHolder() {
+      return holder;
+    }
+
     /** Only LeaseManager object can renew a lease */
     private void renew() {
       this.lastUpdate = FSNamesystem.now();

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 Fri Jan  7 20:11:38 2011
@@ -497,8 +497,8 @@ public class NameNode implements ClientP
   }
 
   /** {...@inheritdoc} */
-  public long nextGenerationStamp(Block block) throws IOException{
-    return namesystem.nextGenerationStampForBlock(block);
+  public long nextGenerationStamp(Block block, boolean fromNN) throws 
IOException{
+    return namesystem.nextGenerationStampForBlock(block, fromNN);
   }
 
   /** {...@inheritdoc} */

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 Fri Jan  7 20:11:38 2011
@@ -35,10 +35,10 @@ import org.apache.hadoop.ipc.VersionedPr
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 19: SendHeartbeat returns an array of DatanodeCommand objects
-   *     in stead of a DatanodeCommand object.
+   * 20: nextGenerationStamp has a new parameter indicating if it is for
+   * NameNode initiated lease recovery or not
    */
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
   
   // error code
   final static int NOTIFY = 0;
@@ -142,10 +142,14 @@ public interface DatanodeProtocol extend
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
   
   /**
-   * @return the next GenerationStamp to be associated with the specified
-   * block. 
+   * Get the next GenerationStamp to be associated with the specified
+   * block.
+   * 
+   * @param block block
+   * @param fromNN if it is for lease recovery initiated by NameNode
+   * @return a new generation stamp
    */
-  public long nextGenerationStamp(Block block) throws IOException;
+  public long nextGenerationStamp(Block block, boolean fromNN) throws 
IOException;
 
   /**
    * Commit block synchronization in lease recovery

Modified: 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1056483&r1=1056482&r2=1056483&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
 Fri Jan  7 20:11:38 2011
@@ -64,7 +64,7 @@ public class TestLeaseRecovery2 extends 
       //create a file
       DistributedFileSystem dfs = 
(DistributedFileSystem)cluster.getFileSystem();
       int size = AppendTestUtil.nextInt(FILE_SIZE);
-      Path filepath = createFile(dfs, size);
+      Path filepath = createFile(dfs, size, true);
 
       // set the soft limit to be 1 second so that the
       // namenode triggers lease recovery on next attempt to write-for-open.
@@ -74,59 +74,60 @@ public class TestLeaseRecovery2 extends 
       verifyFile(dfs, filepath, actual, size);
 
       //test recoverLease
+      // set the soft limit to be 1 hour but recoverLease should
+      // close the file immediately
+      cluster.setLeasePeriod(hardLease, hardLease);
       size = AppendTestUtil.nextInt(FILE_SIZE);
-      filepath = createFile(dfs, size);
+      filepath = createFile(dfs, size, false);
 
-      // set the soft limit to be 1 second so that the
-      // namenode triggers lease recovery on next attempt to write-for-open.
-      cluster.setLeasePeriod(softLease, hardLease);
-
-      recoverLease(filepath);
+      // test recoverLese from a different client
+      recoverLease(filepath, null);
       verifyFile(dfs, filepath, actual, size);
 
+      // test recoverlease from the same client
+      size = AppendTestUtil.nextInt(FILE_SIZE);
+      filepath = createFile(dfs, size, false);
+      
+      // create another file using the same client
+      Path filepath1 = new Path("/foo" + AppendTestUtil.nextInt());
+      FSDataOutputStream stm = dfs.create(filepath1, true,
+          bufferSize, REPLICATION_NUM, BLOCK_SIZE);
+      
+      // recover the first file
+      recoverLease(filepath, dfs);
+      verifyFile(dfs, filepath, actual, size);
+      
+      // continue to write to the second file
+      stm.write(buffer, 0, size);
+      stm.close();
+      verifyFile(dfs, filepath1, actual, size);
     }
     finally {
       try {
-        if (cluster != null) {cluster.shutdown();}
+        if (cluster != null) {cluster.getFileSystem().close(); 
cluster.shutdown();}
       } catch (Exception e) {
         // ignore
       }
     }
   }
   
-  private void recoverLease(Path filepath) throws IOException {
-    Configuration conf2 = new Configuration(conf);
-    String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
-    UnixUserGroupInformation.saveToConf(conf2,
-        UnixUserGroupInformation.UGI_PROPERTY_NAME,
-        new UnixUserGroupInformation(username, new String[]{"supergroup"}));
-    DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get(conf2);
-
-    boolean done = false;
-    while (!done) {
-      try {
-        dfs2.recoverLease(filepath);
-        done = true;
-      } catch (IOException ioe) {
-        final String message = ioe.getMessage();
-        if 
(message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
-          AppendTestUtil.LOG.info("GOOD! got " + message);
-        }
-        else {
-          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
-        }
-      }
-
-      if (!done) {
-        AppendTestUtil.LOG.info("sleep " + 1000 + "ms");
-        try {Thread.sleep(5000);} catch (InterruptedException e) {}
-      }
+  private void recoverLease(Path filepath, DistributedFileSystem dfs2) throws 
Exception {
+    if (dfs2==null) {
+      Configuration conf2 = new Configuration(conf);
+      String username = 
UserGroupInformation.getCurrentUGI().getUserName()+"_1";
+      UnixUserGroupInformation.saveToConf(conf2,
+          UnixUserGroupInformation.UGI_PROPERTY_NAME,
+          new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+      dfs2 = (DistributedFileSystem)FileSystem.get(conf2);
     }
+    dfs2.recoverLease(filepath);
+    checkFileClose(dfs2, filepath);
   }
   
   // try to re-open the file before closing the previous handle. This
   // should fail but will trigger lease recovery.
-  private Path createFile(DistributedFileSystem dfs, int size) throws 
IOException, InterruptedException {
+  private Path createFile(DistributedFileSystem dfs, int size,
+      boolean triggerSoftLease) throws IOException, InterruptedException {
     // create a random file name
     String filestr = "/foo" + AppendTestUtil.nextInt();
     System.out.println("filestr=" + filestr);
@@ -142,19 +143,15 @@ public class TestLeaseRecovery2 extends 
     // sync file
     AppendTestUtil.LOG.info("sync");
     stm.sync();
-    AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-    dfs.dfs.leasechecker.interruptAndJoin();
+    if (triggerSoftLease) {
+      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+      dfs.dfs.leasechecker.interruptAndJoin();
+    }
     return filepath;
   }
   
-  private void recoverLeaseUsingCreate(Path filepath) throws IOException {
-    Configuration conf2 = new Configuration(conf);
-    String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
-    UnixUserGroupInformation.saveToConf(conf2,
-        UnixUserGroupInformation.UGI_PROPERTY_NAME,
-        new UnixUserGroupInformation(username, new String[]{"supergroup"}));
-    FileSystem dfs2 = FileSystem.get(conf2);
-
+  private void checkFileClose(FileSystem dfs2, Path filepath)
+    throws IOException {
     boolean done = false;
     for(int i = 0; i < 10 && !done; i++) {
       AppendTestUtil.LOG.info("i=" + i);
@@ -181,7 +178,17 @@ public class TestLeaseRecovery2 extends 
       }
     }
     assertTrue(done);
+  }
 
+  private void recoverLeaseUsingCreate(Path filepath) throws IOException {
+    Configuration conf2 = new Configuration(conf);
+    String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
+    UnixUserGroupInformation.saveToConf(conf2,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+    FileSystem dfs2 = FileSystem.get(conf2);
+    
+    checkFileClose(dfs2, filepath);
   }
   
   private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,


Reply via email to