Author: szetszwo
Date: Fri Nov  9 20:52:24 2012
New Revision: 1407626

URL: http://svn.apache.org/viewvc?rev=1407626&view=rev
Log:
svn merge -c 1407625 from branch-1 for HDFS-4161. Backport HDFS-1865 "Share 
LeaseChecker thread among DFSClients" and the related JIRAs: HDFS-278, 
HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, HDFS-3646 and HDFS-2240.

Added:
    
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
      - copied unchanged from r1407625, 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
Modified:
    hadoop/common/branches/branch-1.1/   (props changed)
    hadoop/common/branches/branch-1.1/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java
    
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Propchange: hadoop/common/branches/branch-1.1/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1407625

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Fri Nov  9 20:52:24 2012
@@ -13,6 +13,10 @@ Release 1.1.1 - Unreleased
     HADOOP-8995. Remove unnecessary bogus exception from Configuration.java.
     (Jing Zhao via suresh)
 
+    HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients"
+    and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, 
HDFS-2810,
+    HDFS-3646 and HDFS-2240. (szetszwo)
+
   BUG FIXES
 
     HADOOP-8878. Uppercase namenode hostname causes hadoop dfs calls with

Propchange: hadoop/common/branches/branch-1.1/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1407625

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java 
(original)
+++ 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java 
Fri Nov  9 20:52:24 2012
@@ -114,6 +114,21 @@ public class Client {
   }
   
   /**
+   * The time after which a RPC will timeout. If ping is not enabled (via
+   * ipc.client.ping), then the timeout value is the same as the pingInterval.
+   * If ping is enabled, then there is no timeout value.
+   * 
+   * @param conf Configuration
+   * @return the timeout period in milliseconds. -1 if no timeout value is set
+   */
+  final public static int getTimeout(Configuration conf) {
+    if (!conf.getBoolean("ipc.client.ping", true)) {
+      return getPingInterval(conf);
+    }
+    return -1;
+  }
+  
+  /**
    * Increment this client's reference count
    *
    */

Modified: 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 Fri Nov  9 20:52:24 2012
@@ -80,13 +80,12 @@ public class DFSClient implements FSCons
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   public final ClientProtocol namenode;
-  private final ClientProtocol rpcNamenode;
+  final ClientProtocol rpcNamenode;
   private final InetSocketAddress nnAddress;
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
-  Random r = new Random();
+  static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -108,7 +107,18 @@ public class DFSClient implements FSCons
    */
   private volatile boolean serverSupportsHdfs630 = true;
   private volatile boolean serverSupportsHdfs200 = true;
- 
+  final int hdfsTimeout;    // timeout value for a DFS operation.
+  private final String authority;
+
+  /**
+   * A map from file names to {@link DFSOutputStream} objects
+   * that are currently being written by this client.
+   * Note that a file can only be written by a single client.
+   */
+  private final Map<String, DFSOutputStream> filesBeingWritten
+      = new HashMap<String, DFSOutputStream>();
+
+  /** Create a {@link NameNode} proxy */ 
   public static ClientProtocol createNamenode(Configuration conf) throws 
IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -251,14 +261,14 @@ public class DFSClient implements FSCons
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
 
+    this.hdfsTimeout = Client.getTimeout(conf);
     ugi = UserGroupInformation.getCurrentUser();
+    this.authority = nameNodeAddr == null? "null":
+      nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
+    this.clientName = "DFSClient_" + taskId + "_" + 
+        r.nextInt()  + "_" + Thread.currentThread().getId();
 
-    String taskId = conf.get("mapred.task.id");
-    if (taskId != null) {
-      this.clientName = "DFSClient_" + taskId; 
-    } else {
-      this.clientName = "DFSClient_" + r.nextInt();
-    }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
 
@@ -310,20 +320,116 @@ public class DFSClient implements FSCons
       throw result;
     }
   }
+
+  /** Return the lease renewer instance. The renewer thread won't start
+   *  until the first output stream is created. The same instance will
+   *  be returned until all output streams are closed.
+   */
+  public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+      return LeaseRenewer.getInstance(authority, ugi, this);
+  }
+
+  /** Get a lease and start automatic renewal */
+  private void beginFileLease(final String src, final DFSOutputStream out) 
+      throws IOException {
+    getLeaseRenewer().put(src, out, this);
+  }
+
+  /** Stop renewal of lease for the file. */
+  void endFileLease(final String src) throws IOException {
+    getLeaseRenewer().closeFile(src, this);
+  }
     
+
+  /** Put a file. Only called from LeaseRenewer, where proper locking is
+   *  enforced to consistently update its local dfsclients array and 
+   *  client's filesBeingWritten map.
+   */
+  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.put(src, out);
+    }
+  }
+
+  /** Remove a file. Only called from LeaseRenewer. */
+  void removeFileBeingWritten(final String src) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.remove(src);
+    }
+  }
+
+  /** Is file-being-written map empty? */
+  boolean isFilesBeingWrittenEmpty() {
+    synchronized(filesBeingWritten) {
+      return filesBeingWritten.isEmpty();
+    }
+  }
+
+  /**
+   * Renew leases.
+   * @return true if lease was renewed. May return false if this
+   * client has been closed or has no files open.
+   **/
+  boolean renewLease() throws IOException {
+    if (clientRunning && !isFilesBeingWrittenEmpty()) {
+      namenode.renewLease(clientName);
+      return true;
+    }
+    return false;
+  }
+
+  /** Abort and release resources held.  Ignore all errors. */
+  void abort() {
+    clientRunning = false;
+    closeAllFilesBeingWritten(true);
+    
+    try {
+      // remove reference to this client and stop the renewer,
+      // if there is no more clients under the renewer.
+      getLeaseRenewer().closeClient(this);
+    } catch (IOException ioe) {
+      LOG.info("Exception occurred while aborting the client. " + ioe);
+    }
+    RPC.stopProxy(rpcNamenode); // close connections to the namenode
+  }
+
+  /** Close/abort all files being written. */
+  private void closeAllFilesBeingWritten(final boolean abort) {
+    for(;;) {
+      final String src;
+      final DFSOutputStream out;
+      synchronized(filesBeingWritten) {
+        if (filesBeingWritten.isEmpty()) {
+          return;
+        }
+        src = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(src);
+      }
+      if (out != null) {
+        try {
+          if (abort) {
+            out.abort();
+          } else {
+            out.close();
+          }
+        } catch(IOException ie) {
+          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+              ie);
+        }
+      }
+    }
+  }
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leasechecker.close();
+      closeAllFilesBeingWritten(false);
       clientRunning = false;
-      try {
-        leasechecker.interruptAndJoin();
-      } catch (InterruptedException ie) {
-      }
-  
+
+      getLeaseRenewer().closeClient(this);
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
     }
@@ -757,10 +863,10 @@ public class DFSClient implements FSCons
     }
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
-    OutputStream result = new DFSOutputStream(src, masked,
+    final DFSOutputStream result = new DFSOutputStream(src, masked,
         overwrite, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -815,7 +921,7 @@ public class DFSClient implements FSCons
     }
     final DFSOutputStream result = new DFSOutputStream(src, buffersize, 
progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -1392,117 +1498,6 @@ public class DFSClient implements FSCons
     throw new IOException("No live nodes contain current block");
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
-  /** Lease management*/
-  class LeaseChecker implements Runnable {
-    /** A map from src -> DFSOutputStream of files that are currently being
-     * written by this client.
-     */
-    private final SortedMap<String, OutputStream> pendingCreates
-        = new TreeMap<String, OutputStream>();
-
-    private Daemon daemon = null;
-    
-    synchronized void put(String src, OutputStream out) {
-      if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
-          daemon.start();
-        }
-        pendingCreates.put(src, out);
-      }
-    }
-    
-    synchronized void remove(String src) {
-      pendingCreates.remove(src);
-    }
-    
-    void interruptAndJoin() throws InterruptedException {
-      Daemon daemonCopy = null;
-      synchronized (this) {
-        if (daemon != null) {
-          daemon.interrupt();
-          daemonCopy = daemon;
-        }
-      }
-     
-      if (daemonCopy != null) {
-        LOG.debug("Wait for lease checker to terminate");
-        daemonCopy.join();
-      }
-    }
-
-    void close() {
-      while (true) {
-        String src;
-        OutputStream out;
-        synchronized (this) {
-          if (pendingCreates.isEmpty()) {
-            return;
-          }
-          src = pendingCreates.firstKey();
-          out = pendingCreates.remove(src);
-        }
-        if (out != null) {
-          try {
-            out.close();
-          } catch (IOException ie) {
-            LOG.error("Exception closing file " + src+ " : " + ie, ie);
-          }
-        }
-      }
-    }
-
-    private void renew() throws IOException {
-      synchronized(this) {
-        if (pendingCreates.isEmpty()) {
-          return;
-        }
-      }
-      namenode.renewLease(clientName);
-    }
-
-    /**
-     * Periodically check in with the namenode and renew all the leases
-     * when the lease period is half over.
-     */
-    public void run() {
-      long lastRenewed = 0;
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD 
/ 2)) {
-          try {
-            renew();
-            lastRenewed = System.currentTimeMillis();
-          } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
-          }
-        }
-
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
-          }
-          return;
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      String s = getClass().getSimpleName();
-      if (LOG.isTraceEnabled()) {
-        return s + "@" + DFSClient.this + ": "
-               + StringUtils.stringifyException(new Throwable("for testing"));
-      }
-      return s;
-    }
-  }
-
   /** Utility class to encapsulate data node info and its address. */
   private static class DNAddrPair {
     DatanodeInfo info;
@@ -3991,12 +3986,12 @@ public class DFSClient implements FSCons
           throw e;
       }
       closeInternal();
-      leasechecker.remove(src);
       
       if (s != null) {
         s.close();
         s = null;
       }
+      endFileLease(src);
     }
     
     /**
@@ -4009,6 +4004,20 @@ public class DFSClient implements FSCons
       closed = true;
     }
  
+    /**
+     * Aborts this output stream and releases any system 
+     * resources associated with this stream.
+     */
+    synchronized void abort() throws IOException {
+      if (closed) {
+        return;
+      }
+      setLastException(new IOException("Lease timeout of "
+          + (hdfsTimeout / 1000) + " seconds expired."));
+      closeThreads();
+      endFileLease(src);
+    }
+ 
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
@@ -4077,6 +4086,16 @@ public class DFSClient implements FSCons
         while (!fileComplete) {
           fileComplete = namenode.complete(src, clientName);
           if (!fileComplete) {
+            if (!clientRunning ||
+                  (hdfsTimeout > 0 &&
+                   localstart + hdfsTimeout < System.currentTimeMillis())) {
+                String msg = "Unable to close file because dfsclient " +
+                              " was unable to contact the HDFS servers." +
+                              " clientRunning " + clientRunning +
+                              " hdfsTimeout " + hdfsTimeout;
+                LOG.info(msg);
+                throw new IOException(msg);
+            }
             try {
               Thread.sleep(400);
               if (System.currentTimeMillis() - localstart > 5000) {

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
 Fri Nov  9 20:52:24 2012
@@ -165,7 +165,7 @@ public class AppendTestUtil {
     LOG.info("leasechecker.interruptAndJoin()");
     // lose the lease on the client
     DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
-    dfs.dfs.leasechecker.interruptAndJoin();
+    dfs.dfs.getLeaseRenewer().interruptAndJoin();
   }
   
   public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 Fri Nov  9 20:52:24 2012
@@ -46,6 +46,10 @@ import org.junit.Test;
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
 
+  {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private boolean dualPortTesting = false;
   
   private Configuration getTestConfiguration() {
@@ -102,40 +106,108 @@ public class TestDistributedFileSystem {
   @Test
   public void testDFSClient() throws Exception {
     Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
     MiniDFSCluster cluster = null;
 
     try {
       cluster = new MiniDFSCluster(conf, 2, true, null);
-      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final String filepathstring = "/test/LeaseChecker/foo";
+      final Path[] filepaths = new Path[4];
+      for(int i = 0; i < filepaths.length; i++) {
+        filepaths[i] = new Path(filepathstring + i);
+      }
       final long millis = System.currentTimeMillis();
 
       {
         DistributedFileSystem dfs = 
(DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
   
-        //create a file
-        FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-  
-        //write something and close
-        out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-        out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        {
+          //create a file
+          final FSDataOutputStream out = dfs.create(filepaths[0]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //write something
+          out.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close
+          out.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
+        {
+          //create file1
+          final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //create file2
+          final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+
+          //write something to file1
+          out1.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file1
+          out1.close();
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+
+          //write something to file2
+          out2.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file2
+          out2.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
+        {
+          //create file3
+          final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          Thread.sleep(grace/4*3);
+          //passed previous grace period, should still running
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //write something to file3
+          out3.writeLong(millis);
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          //close file3
+          out3.close();
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.getLeaseRenewer().isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
+        }
+
         dfs.close();
       }
 
       {
         DistributedFileSystem dfs = 
(DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
 
         //open and check the file
-        FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        FSDataInputStream in = dfs.open(filepaths[0]);
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
         dfs.close();
       }
     }

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
 Fri Nov  9 20:52:24 2012
@@ -656,7 +656,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.getLeaseRenewer().interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
@@ -726,7 +726,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.getLeaseRenewer().interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
 Fri Nov  9 20:52:24 2012
@@ -910,6 +910,53 @@ public class TestFileCreation {
       dfs.close();
     } finally {
       System.out.println("testFsClose successful");
+      cluster.shutdown();
+    }
+  }
+
+  // test closing file after cluster is shutdown
+  public void testFsCloseAfterClusterShutdown() throws IOException {
+    System.out.println("test testFsCloseAfterClusterShutdown start");
+    final int DATANODE_NUM = 3;
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication.min", 3);
+    conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 
seconds
+    conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, 
null);
+    DistributedFileSystem dfs = null;
+    try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "dhrubashutdown";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, 
DATANODE_NUM);
+      out.write("something_dhruba".getBytes());
+      out.sync();    // ensure that block is allocated
+
+      // shutdown last datanode in pipeline.
+      cluster.stopDataNode(2);
+
+      // close file. Since we have set the minReplcatio to 3 but have killed 
one
+      // of the three datanodes, the close call will loop until the 
hdfsTimeout is
+      // encountered.
+      boolean hasException = false;
+      try {
+        out.close();
+        System.out.println("testFsCloseAfterClusterShutdown: Error here");
+      } catch (IOException e) {
+        hasException = true;
+      }
+      assertTrue("Failed to close file after cluster shutdown", hasException);
+    } finally {
+      System.out.println("testFsCloseAfterClusterShutdown successful");
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 }

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLease.java
 Fri Nov  9 20:52:24 2012
@@ -17,25 +17,34 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
-public class TestLease extends junit.framework.TestCase {
+public class TestLease {
   static boolean hasLease(MiniDFSCluster cluster, Path src) {
     return 
cluster.getNameNode().namesystem.leaseManager.getLeaseByPath(src.toString()) != 
null;
   }
   
   final Path dir = new Path("/test/lease/");
 
+  @Test
   public void testLease() throws Exception {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     try {
       FileSystem fs = cluster.getFileSystem();
-      assertTrue(fs.mkdirs(dir));
+      Assert.assertTrue(fs.mkdirs(dir));
       
       Path a = new Path(dir, "a");
       Path b = new Path(dir, "b");
@@ -43,24 +52,66 @@ public class TestLease extends junit.fra
       DataOutputStream a_out = fs.create(a);
       a_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       DataOutputStream b_out = fs.create(b);
       b_out.writeBytes("something");
 
-      assertTrue(hasLease(cluster, a));
-      assertTrue(hasLease(cluster, b));
+      Assert.assertTrue(hasLease(cluster, a));
+      Assert.assertTrue(hasLease(cluster, b));
 
       a_out.close();
       b_out.close();
 
-      assertTrue(!hasLease(cluster, a));
-      assertTrue(!hasLease(cluster, b));
+      Assert.assertTrue(!hasLease(cluster, a));
+      Assert.assertTrue(!hasLease(cluster, b));
       
       fs.delete(dir, true);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  @Test
+  public void testFactory() throws Exception {
+    final String[] groups = new String[]{"supergroup"};
+    final UserGroupInformation[] ugi = new UserGroupInformation[3];
+    for(int i = 0; i < ugi.length; i++) {
+      ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups);
+    }
+
+    final Configuration conf = new Configuration();
+    final DFSClient c1 = createDFSClientAs(ugi[0], conf);
+    FSDataOutputStream out1 = createFsOut(c1, "/out1");
+    final DFSClient c2 = createDFSClientAs(ugi[0], conf);
+    FSDataOutputStream out2 = createFsOut(c2, "/out2");
+    Assert.assertEquals(c1.getLeaseRenewer(), c2.getLeaseRenewer());
+    final DFSClient c3 = createDFSClientAs(ugi[1], conf);
+    FSDataOutputStream out3 = createFsOut(c3, "/out3");
+    Assert.assertTrue(c1.getLeaseRenewer() != c3.getLeaseRenewer());
+    final DFSClient c4 = createDFSClientAs(ugi[1], conf);
+    FSDataOutputStream out4 = createFsOut(c4, "/out4");
+    Assert.assertEquals(c3.getLeaseRenewer(), c4.getLeaseRenewer());
+    final DFSClient c5 = createDFSClientAs(ugi[2], conf);
+    FSDataOutputStream out5 = createFsOut(c5, "/out5");
+    Assert.assertTrue(c1.getLeaseRenewer() != c5.getLeaseRenewer());
+    Assert.assertTrue(c3.getLeaseRenewer() != c5.getLeaseRenewer());
+  }
+
+  private FSDataOutputStream createFsOut(DFSClient dfs, String path)
+      throws IOException {
+    return new FSDataOutputStream(dfs.create(path, true), null);
+  }
+  
+  static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
+  static public DFSClient createDFSClientAs(UserGroupInformation ugi, 
+      final Configuration conf) throws Exception {
+    return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+      @Override
+      public DFSClient run() throws Exception {
+        return new DFSClient(null, mcp, conf, null);
+      }
+    });
+  }
 }

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1407626&r1=1407625&r2=1407626&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
 Fri Nov  9 20:52:24 2012
@@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends 
     stm.sync();
     if (triggerSoftLease) {
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
+      dfs.dfs.getLeaseRenewer().interruptAndJoin();
     }
     return filepath;
   }


Reply via email to