Author: cmccabe
Date: Thu Mar  6 08:01:41 2014
New Revision: 1574789

URL: http://svn.apache.org/r1574789
Log:
HDFS-6057. DomainSocketWatcher.watcherThread should be marked as a daemon 
thread (cmccabe)

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props 
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/  
 (props changed)
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
    
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project:r1574787

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1574787

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1574789&r1=1574788&r2=1574789&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
Thu Mar  6 08:01:41 2014
@@ -303,6 +303,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5898. Allow NFS gateway to login/relogin from its kerberos keytab.
     (Abin Shahab via atm)
 
+    HDFS-6057. DomainSocketWatcher.watcherThread should be marked as daemon
+    thread (cmccabe)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

Propchange: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged 
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1574787

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java?rev=1574789&r1=1574788&r2=1574789&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
 Thu Mar  6 08:01:41 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileInputStream;
@@ -59,7 +60,7 @@ import org.apache.hadoop.classification.
  * See {@link ShortCircuitRegistry} for more information on the communication 
protocol.
  */
 @InterfaceAudience.Private
-public class DfsClientShmManager {
+public class DfsClientShmManager implements Closeable {
   private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
 
   /**
@@ -225,6 +226,12 @@ public class DfsClientShmManager {
     Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
         String clientName, ExtendedBlockId blockId) throws IOException {
       while (true) {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": the DfsClientShmManager has been closed.");
+          }
+          return null;
+        }
         if (disabled) {
           if (LOG.isTraceEnabled()) {
             LOG.trace(this + ": shared memory segment access is disabled.");
@@ -374,6 +381,8 @@ public class DfsClientShmManager {
     }
   }
 
+  private boolean closed = false;
+
   private final ReentrantLock lock = new ReentrantLock();
 
   /**
@@ -409,6 +418,10 @@ public class DfsClientShmManager {
       String clientName) throws IOException {
     lock.lock();
     try {
+      if (closed) {
+        LOG.trace(this + ": the DfsClientShmManager isclosed.");
+        return null;
+      }
       EndpointShmManager shmManager = datanodes.get(datanode);
       if (shmManager == null) {
         shmManager = new EndpointShmManager(datanode);
@@ -466,9 +479,32 @@ public class DfsClientShmManager {
     }
   }
 
+  /**
+   * Close the DfsClientShmManager.
+   */
+  @Override
+  public void close() throws IOException {
+    lock.lock();
+    try {
+      if (closed) return;
+      closed = true;
+    } finally {
+      lock.unlock();
+    }
+    // When closed, the domainSocketWatcher will issue callbacks that mark
+    // all the outstanding DfsClientShm segments as stale.
+    IOUtils.cleanup(LOG, domainSocketWatcher);
+  }
+
+
   @Override
   public String toString() {
     return String.format("ShortCircuitShmManager(%08x)",
         System.identityHashCode(this));
   }
+
+  @VisibleForTesting
+  public DomainSocketWatcher getDomainSocketWatcher() {
+    return domainSocketWatcher;
+  }
 }

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1574789&r1=1574788&r2=1574789&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
 Thu Mar  6 08:01:41 2014
@@ -887,6 +887,7 @@ public class ShortCircuitCache implement
   /**
    * Close the cache and free all associated resources.
    */
+  @Override
   public void close() {
     try {
       lock.lock();
@@ -911,6 +912,7 @@ public class ShortCircuitCache implement
     } finally {
       lock.unlock();
     }
+    IOUtils.cleanup(LOG, shmManager);
   }
 
   @VisibleForTesting // ONLY for testing

Modified: 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1574789&r1=1574788&r2=1574789&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
 Thu Mar  6 08:01:41 2014
@@ -376,4 +376,37 @@ public class TestBlockReaderFactory {
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     cluster.shutdown();
   }
+  
+  /**
+   * Test shutting down the ShortCircuitCache while there are things in it.
+   */
+  @Test
+  public void testShortCircuitCacheShutdown() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testShortCircuitCacheShutdown", sockDir);
+    conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
+    Configuration serverConf = new Configuration(conf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    cache.close();
+    Assert.assertTrue(cache.getDfsClientShmManager().
+        getDomainSocketWatcher().isClosed());
+    cluster.shutdown();
+  }
 }


Reply via email to