Author: hairong Date: Fri Jan 7 20:11:38 2011 New Revision: 1056483 URL: http://svn.apache.org/viewvc?rev=1056483&view=rev Log: HDFS-1555. Disallow pipeline recovery if a file is already being lease recovered. Contributed by Hairong Kuang.
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Jan 7 20:11:38 2011 @@ -91,6 +91,9 @@ Release 0.20-append - Unreleased HDFS-724. Use a bidirectional heartbeat to detect stuck pipeline. (hairong) + HDFS-1555. Disallow pipelien recovery if a file is already being + lease recovered. (hairong) + Release 0.20.3 - Unreleased NEW FEATURES Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Jan 7 20:11:38 2011 @@ -180,7 +180,8 @@ public class DistributedFileSystem exten } /** - * Trigger the lease reovery of a file + * Start the lease reovery of a file + * * @param f a file * @throws IOException if an error occurs */ Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Jan 7 20:11:38 2011 @@ -124,8 +124,9 @@ public interface ClientProtocol extends public LocatedBlock append(String src, String clientName) throws IOException; /** - * Trigger lease recovery to happen - * @param src path of the file to trigger lease recovery + * Start lease recovery + * + * @param src path of the file to start lease recovery * @param clientName name of the current client * @throws IOException */ Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jan 7 20:11:38 2011 @@ -1574,7 +1574,7 @@ public class DataNode extends Configured List<DatanodeID> successList = new ArrayList<DatanodeID>(); - long generationstamp = namenode.nextGenerationStamp(block); + long generationstamp = namenode.nextGenerationStamp(block, closeFile); Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp); for(BlockRecord r : syncList) { Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jan 7 20:11:38 2011 @@ -893,6 +893,23 @@ class FSDirectory implements FSConstants return fullPathName.toString(); } + /** Return the full path name of the specified inode */ + static String getFullPathName(INode inode) { + // calculate the depth of this inode from root + int depth = 0; + for (INode i = inode; i != null; i = i.parent) { + depth++; + } + INode[] inodes = new INode[depth]; + + // fill up the inodes in the path from this inode to root + for (int i = 0; i < depth; i++) { + inodes[depth-i-1] = inode; + inode = inode.parent; + } + return getFullPathName(inodes, depth-1); + } + /** * Create a directory * If ancestor directories do not exist, automatically create them. Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jan 7 20:11:38 2011 @@ -1051,7 +1051,7 @@ public class FSNamesystem implements FSC try { INode myFile = dir.getFileINode(src); - recoverLeaseInternal(myFile, src, holder, clientMachine); + recoverLeaseInternal(myFile, src, holder, clientMachine, false); try { verifyReplication(src, replication, clientMachine); @@ -1126,11 +1126,11 @@ public class FSNamesystem implements FSC } /** - * Trigger to recover lease; - * When the method returns successfully, the lease has been recovered and - * the file is closed. + * Recover lease; + * Immediately revoke the lease of the current lease holder and start lease + * recovery so that the file can be forced to be closed. * - * @param src the path of the file to trigger release + * @param src the path of the file to start lease recovery * @param holder the lease holder's name * @param clientMachine the client machine's name * @throws IOException @@ -1154,11 +1154,11 @@ public class FSNamesystem implements FSC checkPathAccess(src, FsAction.WRITE); } - recoverLeaseInternal(inode, src, holder, clientMachine); + recoverLeaseInternal(inode, src, holder, clientMachine, true); } private void recoverLeaseInternal(INode fileInode, - String src, String holder, String clientMachine) + String src, String holder, String clientMachine, boolean force) throws IOException { if (fileInode != null && fileInode.isUnderConstruction()) { INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode; @@ -1171,7 +1171,7 @@ public class FSNamesystem implements FSC // We found the lease for this file. And surprisingly the original // holder is trying to recreate this file. This should never occur. // - if (lease != null) { + if (!force && lease != null) { Lease leaseFile = leaseManager.getLeaseByPath(src); if (leaseFile != null && leaseFile.equals(lease)) { throw new AlreadyBeingCreatedException( @@ -1190,21 +1190,29 @@ public class FSNamesystem implements FSC " on client " + clientMachine + " because pendingCreates is non-null but no leases found."); } - // - // If the original holder has not renewed in the last SOFTLIMIT - // period, then start lease recovery. - // - if (lease.expiredSoftLimit()) { - LOG.info("startFile: recover lease " + lease + ", src=" + src + + if (force) { + // close now: no need to wait for soft lease expiration and + // close only the file src + LOG.info("recoverLease: recover lease " + lease + ", src=" + src + " from client " + pendingFile.clientName); - internalReleaseLease(lease, src); + internalReleaseLeaseOne(lease, src); + } else { + // + // If the original holder has not renewed in the last SOFTLIMIT + // period, then start lease recovery. + // + if (lease.expiredSoftLimit()) { + LOG.info("startFile: recover lease " + lease + ", src=" + src + + " from client " + pendingFile.clientName); + internalReleaseLease(lease, src); + } + throw new AlreadyBeingCreatedException( + "failed to create file " + src + " for " + holder + + " on client " + clientMachine + + ", because this file is already being created by " + + pendingFile.getClientName() + + " on " + pendingFile.getClientMachine()); } - throw new AlreadyBeingCreatedException( - "failed to create file " + src + " for " + holder + - " on client " + clientMachine + - ", because this file is already being created by " + - pendingFile.getClientName() + - " on " + pendingFile.getClientMachine()); } } @@ -4848,8 +4856,12 @@ public class FSNamesystem implements FSC /** * Verifies that the block is associated with a file that has a lease. * Increments, logs and then returns the stamp - */ - synchronized long nextGenerationStampForBlock(Block block) throws IOException { + * + * @param block block + * @param fromNN if it is for lease recovery initiated by NameNode + * @return a new generation stamp + */ + synchronized long nextGenerationStampForBlock(Block block, boolean fromNN) throws IOException { if (isInSafeMode()) { throw new SafeModeException("Cannot get nextGenStamp for " + block, safeMode); } @@ -4865,6 +4877,15 @@ public class FSNamesystem implements FSC LOG.info(msg); throw new IOException(msg); } + // Disallow client-initiated recovery once + // NameNode initiated lease recovery starts + if (!fromNN && HdfsConstants.NN_RECOVERY_LEASEHOLDER.equals( + leaseManager.getLeaseByPath(FSDirectory.getFullPathName(fileINode)).getHolder())) { + String msg = block + + "is being recovered by NameNode, ignoring the request from a client"; + LOG.info(msg); + throw new IOException(msg); + } if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) { String msg = block + " is already being recovered, ignoring this request."; LOG.info(msg); Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Fri Jan 7 20:11:38 2011 @@ -201,6 +201,12 @@ public class LeaseManager { this.holder = holder; renew(); } + + /** Get the holder of the lease */ + public String getHolder() { + return holder; + } + /** Only LeaseManager object can renew a lease */ private void renew() { this.lastUpdate = FSNamesystem.now(); Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jan 7 20:11:38 2011 @@ -497,8 +497,8 @@ public class NameNode implements ClientP } /** {...@inheritdoc} */ - public long nextGenerationStamp(Block block) throws IOException{ - return namesystem.nextGenerationStampForBlock(block); + public long nextGenerationStamp(Block block, boolean fromNN) throws IOException{ + return namesystem.nextGenerationStampForBlock(block, fromNN); } /** {...@inheritdoc} */ Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Fri Jan 7 20:11:38 2011 @@ -35,10 +35,10 @@ import org.apache.hadoop.ipc.VersionedPr **********************************************************************/ public interface DatanodeProtocol extends VersionedProtocol { /** - * 19: SendHeartbeat returns an array of DatanodeCommand objects - * in stead of a DatanodeCommand object. + * 20: nextGenerationStamp has a new parameter indicating if it is for + * NameNode initiated lease recovery or not */ - public static final long versionID = 19L; + public static final long versionID = 20L; // error code final static int NOTIFY = 0; @@ -142,10 +142,14 @@ public interface DatanodeProtocol extend public void reportBadBlocks(LocatedBlock[] blocks) throws IOException; /** - * @return the next GenerationStamp to be associated with the specified - * block. + * Get the next GenerationStamp to be associated with the specified + * block. + * + * @param block block + * @param fromNN if it is for lease recovery initiated by NameNode + * @return a new generation stamp */ - public long nextGenerationStamp(Block block) throws IOException; + public long nextGenerationStamp(Block block, boolean fromNN) throws IOException; /** * Commit block synchronization in lease recovery Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1056483&r1=1056482&r2=1056483&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Jan 7 20:11:38 2011 @@ -64,7 +64,7 @@ public class TestLeaseRecovery2 extends //create a file DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); int size = AppendTestUtil.nextInt(FILE_SIZE); - Path filepath = createFile(dfs, size); + Path filepath = createFile(dfs, size, true); // set the soft limit to be 1 second so that the // namenode triggers lease recovery on next attempt to write-for-open. @@ -74,59 +74,60 @@ public class TestLeaseRecovery2 extends verifyFile(dfs, filepath, actual, size); //test recoverLease + // set the soft limit to be 1 hour but recoverLease should + // close the file immediately + cluster.setLeasePeriod(hardLease, hardLease); size = AppendTestUtil.nextInt(FILE_SIZE); - filepath = createFile(dfs, size); + filepath = createFile(dfs, size, false); - // set the soft limit to be 1 second so that the - // namenode triggers lease recovery on next attempt to write-for-open. - cluster.setLeasePeriod(softLease, hardLease); - - recoverLease(filepath); + // test recoverLese from a different client + recoverLease(filepath, null); verifyFile(dfs, filepath, actual, size); + // test recoverlease from the same client + size = AppendTestUtil.nextInt(FILE_SIZE); + filepath = createFile(dfs, size, false); + + // create another file using the same client + Path filepath1 = new Path("/foo" + AppendTestUtil.nextInt()); + FSDataOutputStream stm = dfs.create(filepath1, true, + bufferSize, REPLICATION_NUM, BLOCK_SIZE); + + // recover the first file + recoverLease(filepath, dfs); + verifyFile(dfs, filepath, actual, size); + + // continue to write to the second file + stm.write(buffer, 0, size); + stm.close(); + verifyFile(dfs, filepath1, actual, size); } finally { try { - if (cluster != null) {cluster.shutdown();} + if (cluster != null) {cluster.getFileSystem().close(); cluster.shutdown();} } catch (Exception e) { // ignore } } } - private void recoverLease(Path filepath) throws IOException { - Configuration conf2 = new Configuration(conf); - String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; - UnixUserGroupInformation.saveToConf(conf2, - UnixUserGroupInformation.UGI_PROPERTY_NAME, - new UnixUserGroupInformation(username, new String[]{"supergroup"})); - DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get(conf2); - - boolean done = false; - while (!done) { - try { - dfs2.recoverLease(filepath); - done = true; - } catch (IOException ioe) { - final String message = ioe.getMessage(); - if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { - AppendTestUtil.LOG.info("GOOD! got " + message); - } - else { - AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe); - } - } - - if (!done) { - AppendTestUtil.LOG.info("sleep " + 1000 + "ms"); - try {Thread.sleep(5000);} catch (InterruptedException e) {} - } + private void recoverLease(Path filepath, DistributedFileSystem dfs2) throws Exception { + if (dfs2==null) { + Configuration conf2 = new Configuration(conf); + String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; + UnixUserGroupInformation.saveToConf(conf2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + dfs2 = (DistributedFileSystem)FileSystem.get(conf2); } + dfs2.recoverLease(filepath); + checkFileClose(dfs2, filepath); } // try to re-open the file before closing the previous handle. This // should fail but will trigger lease recovery. - private Path createFile(DistributedFileSystem dfs, int size) throws IOException, InterruptedException { + private Path createFile(DistributedFileSystem dfs, int size, + boolean triggerSoftLease) throws IOException, InterruptedException { // create a random file name String filestr = "/foo" + AppendTestUtil.nextInt(); System.out.println("filestr=" + filestr); @@ -142,19 +143,15 @@ public class TestLeaseRecovery2 extends // sync file AppendTestUtil.LOG.info("sync"); stm.sync(); - AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); - dfs.dfs.leasechecker.interruptAndJoin(); + if (triggerSoftLease) { + AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); + dfs.dfs.leasechecker.interruptAndJoin(); + } return filepath; } - private void recoverLeaseUsingCreate(Path filepath) throws IOException { - Configuration conf2 = new Configuration(conf); - String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; - UnixUserGroupInformation.saveToConf(conf2, - UnixUserGroupInformation.UGI_PROPERTY_NAME, - new UnixUserGroupInformation(username, new String[]{"supergroup"})); - FileSystem dfs2 = FileSystem.get(conf2); - + private void checkFileClose(FileSystem dfs2, Path filepath) + throws IOException { boolean done = false; for(int i = 0; i < 10 && !done; i++) { AppendTestUtil.LOG.info("i=" + i); @@ -181,7 +178,17 @@ public class TestLeaseRecovery2 extends } } assertTrue(done); + } + private void recoverLeaseUsingCreate(Path filepath) throws IOException { + Configuration conf2 = new Configuration(conf); + String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; + UnixUserGroupInformation.saveToConf(conf2, + UnixUserGroupInformation.UGI_PROPERTY_NAME, + new UnixUserGroupInformation(username, new String[]{"supergroup"})); + FileSystem dfs2 = FileSystem.get(conf2); + + checkFileClose(dfs2, filepath); } private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,