Author: szetszwo Date: Fri Nov 9 20:52:24 2012 New Revision: 1407626 URL: http://svn.apache.org/viewvc?rev=1407626&view=rev Log: svn merge -c 1407625 from branch-1 for HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients" and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, HDFS-3646 and HDFS-2240.
Added: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java - copied unchanged from r1407625, hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java Modified: hadoop/common/branches/branch-1.1/ (props changed) hadoop/common/branches/branch-1.1/CHANGES.txt (contents, props changed) hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Propchange: hadoop/common/branches/branch-1.1/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-1:r1407625 Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Fri Nov 9 20:52:24 2012 @@ -13,6 +13,10 @@ Release 1.1.1 - Unreleased HADOOP-8995. Remove unnecessary bogus exception from Configuration.java. (Jing Zhao via suresh) + HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients" + and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, + HDFS-3646 and HDFS-2240. (szetszwo) + BUG FIXES HADOOP-8878. Uppercase namenode hostname causes hadoop dfs calls with Propchange: hadoop/common/branches/branch-1.1/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1407625 Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java Fri Nov 9 20:52:24 2012 @@ -114,6 +114,21 @@ public class Client { } /** + * The time after which a RPC will timeout. If ping is not enabled (via + * ipc.client.ping), then the timeout value is the same as the pingInterval. + * If ping is enabled, then there is no timeout value. + * + * @param conf Configuration + * @return the timeout period in milliseconds. -1 if no timeout value is set + */ + final public static int getTimeout(Configuration conf) { + if (!conf.getBoolean("ipc.client.ping", true)) { + return getPingInterval(conf); + } + return -1; + } + + /** * Increment this client's reference count * */ Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Nov 9 20:52:24 2012 @@ -80,13 +80,12 @@ public class DFSClient implements FSCons public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3; private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB public final ClientProtocol namenode; - private final ClientProtocol rpcNamenode; + final ClientProtocol rpcNamenode; private final InetSocketAddress nnAddress; final UserGroupInformation ugi; volatile boolean clientRunning = true; - Random r = new Random(); + static Random r = new Random(); final String clientName; - final LeaseChecker leasechecker = new LeaseChecker(); private Configuration conf; private long defaultBlockSize; private short defaultReplication; @@ -108,7 +107,18 @@ public class DFSClient implements FSCons */ private volatile boolean serverSupportsHdfs630 = true; private volatile boolean serverSupportsHdfs200 = true; - + final int hdfsTimeout; // timeout value for a DFS operation. + private final String authority; + + /** + * A map from file names to {@link DFSOutputStream} objects + * that are currently being written by this client. + * Note that a file can only be written by a single client. + */ + private final Map<String, DFSOutputStream> filesBeingWritten + = new HashMap<String, DFSOutputStream>(); + + /** Create a {@link NameNode} proxy */ public static ClientProtocol createNamenode(Configuration conf) throws IOException { return createNamenode(NameNode.getAddress(conf), conf); } @@ -251,14 +261,14 @@ public class DFSClient implements FSCons this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); + this.hdfsTimeout = Client.getTimeout(conf); ugi = UserGroupInformation.getCurrentUser(); + this.authority = nameNodeAddr == null? "null": + nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); + String taskId = conf.get("mapred.task.id", "NONMAPREDUCE"); + this.clientName = "DFSClient_" + taskId + "_" + + r.nextInt() + "_" + Thread.currentThread().getId(); - String taskId = conf.get("mapred.task.id"); - if (taskId != null) { - this.clientName = "DFSClient_" + taskId; - } else { - this.clientName = "DFSClient_" + r.nextInt(); - } defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); @@ -310,20 +320,116 @@ public class DFSClient implements FSCons throw result; } } + + /** Return the lease renewer instance. The renewer thread won't start + * until the first output stream is created. The same instance will + * be returned until all output streams are closed. + */ + public synchronized LeaseRenewer getLeaseRenewer() throws IOException { + return LeaseRenewer.getInstance(authority, ugi, this); + } + + /** Get a lease and start automatic renewal */ + private void beginFileLease(final String src, final DFSOutputStream out) + throws IOException { + getLeaseRenewer().put(src, out, this); + } + + /** Stop renewal of lease for the file. */ + void endFileLease(final String src) throws IOException { + getLeaseRenewer().closeFile(src, this); + } + + /** Put a file. Only called from LeaseRenewer, where proper locking is + * enforced to consistently update its local dfsclients array and + * client's filesBeingWritten map. + */ + void putFileBeingWritten(final String src, final DFSOutputStream out) { + synchronized(filesBeingWritten) { + filesBeingWritten.put(src, out); + } + } + + /** Remove a file. Only called from LeaseRenewer. */ + void removeFileBeingWritten(final String src) { + synchronized(filesBeingWritten) { + filesBeingWritten.remove(src); + } + } + + /** Is file-being-written map empty? */ + boolean isFilesBeingWrittenEmpty() { + synchronized(filesBeingWritten) { + return filesBeingWritten.isEmpty(); + } + } + + /** + * Renew leases. + * @return true if lease was renewed. May return false if this + * client has been closed or has no files open. + **/ + boolean renewLease() throws IOException { + if (clientRunning && !isFilesBeingWrittenEmpty()) { + namenode.renewLease(clientName); + return true; + } + return false; + } + + /** Abort and release resources held. Ignore all errors. */ + void abort() { + clientRunning = false; + closeAllFilesBeingWritten(true); + + try { + // remove reference to this client and stop the renewer, + // if there is no more clients under the renewer. + getLeaseRenewer().closeClient(this); + } catch (IOException ioe) { + LOG.info("Exception occurred while aborting the client. " + ioe); + } + RPC.stopProxy(rpcNamenode); // close connections to the namenode + } + + /** Close/abort all files being written. */ + private void closeAllFilesBeingWritten(final boolean abort) { + for(;;) { + final String src; + final DFSOutputStream out; + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + src = filesBeingWritten.keySet().iterator().next(); + out = filesBeingWritten.remove(src); + } + if (out != null) { + try { + if (abort) { + out.abort(); + } else { + out.close(); + } + } catch(IOException ie) { + LOG.error("Failed to " + (abort? "abort": "close") + " file " + src, + ie); + } + } + } + } + /** * Close the file system, abandoning all of the leases and files being * created and close connections to the namenode. */ public synchronized void close() throws IOException { if(clientRunning) { - leasechecker.close(); + closeAllFilesBeingWritten(false); clientRunning = false; - try { - leasechecker.interruptAndJoin(); - } catch (InterruptedException ie) { - } - + + getLeaseRenewer().closeClient(this); // close connections to the namenode RPC.stopProxy(rpcNamenode); } @@ -757,10 +863,10 @@ public class DFSClient implements FSCons } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); - OutputStream result = new DFSOutputStream(src, masked, + final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); - leasechecker.put(src, result); + beginFileLease(src, result); return result; } @@ -815,7 +921,7 @@ public class DFSClient implements FSCons } final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress, lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512)); - leasechecker.put(src, result); + beginFileLease(src, result); return result; } @@ -1392,117 +1498,6 @@ public class DFSClient implements FSCons throw new IOException("No live nodes contain current block"); } - boolean isLeaseCheckerStarted() { - return leasechecker.daemon != null; - } - - /** Lease management*/ - class LeaseChecker implements Runnable { - /** A map from src -> DFSOutputStream of files that are currently being - * written by this client. - */ - private final SortedMap<String, OutputStream> pendingCreates - = new TreeMap<String, OutputStream>(); - - private Daemon daemon = null; - - synchronized void put(String src, OutputStream out) { - if (clientRunning) { - if (daemon == null) { - daemon = new Daemon(this); - daemon.start(); - } - pendingCreates.put(src, out); - } - } - - synchronized void remove(String src) { - pendingCreates.remove(src); - } - - void interruptAndJoin() throws InterruptedException { - Daemon daemonCopy = null; - synchronized (this) { - if (daemon != null) { - daemon.interrupt(); - daemonCopy = daemon; - } - } - - if (daemonCopy != null) { - LOG.debug("Wait for lease checker to terminate"); - daemonCopy.join(); - } - } - - void close() { - while (true) { - String src; - OutputStream out; - synchronized (this) { - if (pendingCreates.isEmpty()) { - return; - } - src = pendingCreates.firstKey(); - out = pendingCreates.remove(src); - } - if (out != null) { - try { - out.close(); - } catch (IOException ie) { - LOG.error("Exception closing file " + src+ " : " + ie, ie); - } - } - } - } - - private void renew() throws IOException { - synchronized(this) { - if (pendingCreates.isEmpty()) { - return; - } - } - namenode.renewLease(clientName); - } - - /** - * Periodically check in with the namenode and renew all the leases - * when the lease period is half over. - */ - public void run() { - long lastRenewed = 0; - while (clientRunning && !Thread.interrupted()) { - if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) { - try { - renew(); - lastRenewed = System.currentTimeMillis(); - } catch (IOException ie) { - LOG.warn("Problem renewing lease for " + clientName, ie); - } - } - - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - if (LOG.isDebugEnabled()) { - LOG.debug(this + " is interrupted.", ie); - } - return; - } - } - } - - /** {@inheritDoc} */ - public String toString() { - String s = getClass().getSimpleName(); - if (LOG.isTraceEnabled()) { - return s + "@" + DFSClient.this + ": " - + StringUtils.stringifyException(new Throwable("for testing")); - } - return s; - } - } - /** Utility class to encapsulate data node info and its address. */ private static class DNAddrPair { DatanodeInfo info; @@ -3991,12 +3986,12 @@ public class DFSClient implements FSCons throw e; } closeInternal(); - leasechecker.remove(src); if (s != null) { s.close(); s = null; } + endFileLease(src); } /** @@ -4009,6 +4004,20 @@ public class DFSClient implements FSCons closed = true; } + /** + * Aborts this output stream and releases any system + * resources associated with this stream. + */ + synchronized void abort() throws IOException { + if (closed) { + return; + } + setLastException(new IOException("Lease timeout of " + + (hdfsTimeout / 1000) + " seconds expired.")); + closeThreads(); + endFileLease(src); + } + // shutdown datastreamer and responseprocessor threads. private void closeThreads() throws IOException { try { @@ -4077,6 +4086,16 @@ public class DFSClient implements FSCons while (!fileComplete) { fileComplete = namenode.complete(src, clientName); if (!fileComplete) { + if (!clientRunning || + (hdfsTimeout > 0 && + localstart + hdfsTimeout < System.currentTimeMillis())) { + String msg = "Unable to close file because dfsclient " + + " was unable to contact the HDFS servers." + + " clientRunning " + clientRunning + + " hdfsTimeout " + hdfsTimeout; + LOG.info(msg); + throw new IOException(msg); + } try { Thread.sleep(400); if (System.currentTimeMillis() - localstart > 5000) { Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java Fri Nov 9 20:52:24 2012 @@ -165,7 +165,7 @@ public class AppendTestUtil { LOG.info("leasechecker.interruptAndJoin()"); // lose the lease on the client DistributedFileSystem dfs = (DistributedFileSystem)whichfs; - dfs.dfs.leasechecker.interruptAndJoin(); + dfs.dfs.getLeaseRenewer().interruptAndJoin(); } public static void recoverFile(MiniDFSCluster cluster, FileSystem fs, Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Nov 9 20:52:24 2012 @@ -46,6 +46,10 @@ import org.junit.Test; public class TestDistributedFileSystem { private static final Random RAN = new Random(); + { + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + private boolean dualPortTesting = false; private Configuration getTestConfiguration() { @@ -102,40 +106,108 @@ public class TestDistributedFileSystem { @Test public void testDFSClient() throws Exception { Configuration conf = getTestConfiguration(); + final long grace = 1000L; MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster(conf, 2, true, null); - final Path filepath = new Path("/test/LeaseChecker/foo"); + final String filepathstring = "/test/LeaseChecker/foo"; + final Path[] filepaths = new Path[4]; + for(int i = 0; i < filepaths.length; i++) { + filepaths[i] = new Path(filepathstring + i); + } final long millis = System.currentTimeMillis(); { DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); - //create a file - FSDataOutputStream out = dfs.create(filepath); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); - - //write something and close - out.writeLong(millis); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); - out.close(); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); + { + //create a file + final FSDataOutputStream out = dfs.create(filepaths[0]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //write something + out.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close + out.close(); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + for(int i = 0; i < 3; i++) { + if (dfs.dfs.getLeaseRenewer().isRunning()) { + Thread.sleep(grace/2); + } + } + //passed grace period + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + } + + { + //create file1 + final FSDataOutputStream out1 = dfs.create(filepaths[1]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //create file2 + final FSDataOutputStream out2 = dfs.create(filepaths[2]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + + //write something to file1 + out1.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file1 + out1.close(); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + + //write something to file2 + out2.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file2 + out2.close(); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + } + + { + //create file3 + final FSDataOutputStream out3 = dfs.create(filepaths[3]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + Thread.sleep(grace/4*3); + //passed previous grace period, should still running + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //write something to file3 + out3.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file3 + out3.close(); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + for(int i = 0; i < 3; i++) { + if (dfs.dfs.getLeaseRenewer().isRunning()) { + Thread.sleep(grace/2); + } + } + //passed grace period + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + } + dfs.close(); } { DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); //open and check the file - FSDataInputStream in = dfs.open(filepath); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + FSDataInputStream in = dfs.open(filepaths[0]); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertEquals(millis, in.readLong()); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); in.close(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); dfs.close(); } } Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Nov 9 20:52:24 2012 @@ -656,7 +656,7 @@ public class TestFileAppend4 extends Tes // has not been completed in the NN. // Lose the leases LOG.info("Killing lease checker"); - client.leasechecker.interruptAndJoin(); + client.getLeaseRenewer().interruptAndJoin(); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( @@ -726,7 +726,7 @@ public class TestFileAppend4 extends Tes // has not been completed in the NN. // Lose the leases LOG.info("Killing lease checker"); - client.leasechecker.interruptAndJoin(); + client.getLeaseRenewer().interruptAndJoin(); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java Fri Nov 9 20:52:24 2012 @@ -910,6 +910,53 @@ public class TestFileCreation { dfs.close(); } finally { System.out.println("testFsClose successful"); + cluster.shutdown(); + } + } + + // test closing file after cluster is shutdown + public void testFsCloseAfterClusterShutdown() throws IOException { + System.out.println("test testFsCloseAfterClusterShutdown start"); + final int DATANODE_NUM = 3; + + Configuration conf = new Configuration(); + conf.setInt("dfs.replication.min", 3); + conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds + conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second + + // create cluster + MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null); + DistributedFileSystem dfs = null; + try { + cluster.waitActive(); + dfs = (DistributedFileSystem)cluster.getFileSystem(); + + // create a new file. + final String f = DIR + "dhrubashutdown"; + final Path fpath = new Path(f); + FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); + out.write("something_dhruba".getBytes()); + out.sync(); // ensure that block is allocated + + // shutdown last datanode in pipeline. + cluster.stopDataNode(2); + + // close file. Since we have set the minReplcatio to 3 but have killed one + // of the three datanodes, the close call will loop until the hdfsTimeout is + // encountered. + boolean hasException = false; + try { + out.close(); + System.out.println("testFsCloseAfterClusterShutdown: Error here"); + } catch (IOException e) { + hasException = true; + } + assertTrue("Failed to close file after cluster shutdown", hasException); + } finally { + System.out.println("testFsCloseAfterClusterShutdown successful"); + if (cluster != null) { + cluster.shutdown(); + } } } } Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java Fri Nov 9 20:52:24 2012 @@ -17,25 +17,34 @@ */ package org.apache.hadoop.hdfs; -import java.io.*; +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; -public class TestLease extends junit.framework.TestCase { +public class TestLease { static boolean hasLease(MiniDFSCluster cluster, Path src) { return cluster.getNameNode().namesystem.leaseManager.getLeaseByPath(src.toString()) != null; } final Path dir = new Path("/test/lease/"); + @Test public void testLease() throws Exception { Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); try { FileSystem fs = cluster.getFileSystem(); - assertTrue(fs.mkdirs(dir)); + Assert.assertTrue(fs.mkdirs(dir)); Path a = new Path(dir, "a"); Path b = new Path(dir, "b"); @@ -43,24 +52,66 @@ public class TestLease extends junit.fra DataOutputStream a_out = fs.create(a); a_out.writeBytes("something"); - assertTrue(hasLease(cluster, a)); - assertTrue(!hasLease(cluster, b)); + Assert.assertTrue(hasLease(cluster, a)); + Assert.assertTrue(!hasLease(cluster, b)); DataOutputStream b_out = fs.create(b); b_out.writeBytes("something"); - assertTrue(hasLease(cluster, a)); - assertTrue(hasLease(cluster, b)); + Assert.assertTrue(hasLease(cluster, a)); + Assert.assertTrue(hasLease(cluster, b)); a_out.close(); b_out.close(); - assertTrue(!hasLease(cluster, a)); - assertTrue(!hasLease(cluster, b)); + Assert.assertTrue(!hasLease(cluster, a)); + Assert.assertTrue(!hasLease(cluster, b)); fs.delete(dir, true); } finally { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testFactory() throws Exception { + final String[] groups = new String[]{"supergroup"}; + final UserGroupInformation[] ugi = new UserGroupInformation[3]; + for(int i = 0; i < ugi.length; i++) { + ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups); + } + + final Configuration conf = new Configuration(); + final DFSClient c1 = createDFSClientAs(ugi[0], conf); + FSDataOutputStream out1 = createFsOut(c1, "/out1"); + final DFSClient c2 = createDFSClientAs(ugi[0], conf); + FSDataOutputStream out2 = createFsOut(c2, "/out2"); + Assert.assertEquals(c1.getLeaseRenewer(), c2.getLeaseRenewer()); + final DFSClient c3 = createDFSClientAs(ugi[1], conf); + FSDataOutputStream out3 = createFsOut(c3, "/out3"); + Assert.assertTrue(c1.getLeaseRenewer() != c3.getLeaseRenewer()); + final DFSClient c4 = createDFSClientAs(ugi[1], conf); + FSDataOutputStream out4 = createFsOut(c4, "/out4"); + Assert.assertEquals(c3.getLeaseRenewer(), c4.getLeaseRenewer()); + final DFSClient c5 = createDFSClientAs(ugi[2], conf); + FSDataOutputStream out5 = createFsOut(c5, "/out5"); + Assert.assertTrue(c1.getLeaseRenewer() != c5.getLeaseRenewer()); + Assert.assertTrue(c3.getLeaseRenewer() != c5.getLeaseRenewer()); + } + + private FSDataOutputStream createFsOut(DFSClient dfs, String path) + throws IOException { + return new FSDataOutputStream(dfs.create(path, true), null); + } + + static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class); + static public DFSClient createDFSClientAs(UserGroupInformation ugi, + final Configuration conf) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() { + @Override + public DFSClient run() throws Exception { + return new DFSClient(null, mcp, conf, null); + } + }); + } } Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1407626&r1=1407625&r2=1407626&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Nov 9 20:52:24 2012 @@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends stm.sync(); if (triggerSoftLease) { AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); - dfs.dfs.leasechecker.interruptAndJoin(); + dfs.dfs.getLeaseRenewer().interruptAndJoin(); } return filepath; }