Author: cmccabe Date: Thu Mar 6 07:57:19 2014 New Revision: 1574787 URL: http://svn.apache.org/r1574787 Log: HDFS-6057. DomainSocketWatcher.watcherThread should be marked as a daemon thread (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1574787&r1=1574786&r2=1574787&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Mar 6 07:57:19 2014 @@ -689,6 +689,9 @@ Release 2.4.0 - UNRELEASED HDFS-5898. Allow NFS gateway to login/relogin from its kerberos keytab. (Abin Shahab via atm) + HDFS-6057. DomainSocketWatcher.watcherThread should be marked as daemon + thread (cmccabe) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java?rev=1574787&r1=1574786&r2=1574787&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java Thu Mar 6 07:57:19 2014 @@ -21,6 +21,7 @@ import com.google.common.annotations.Vis import com.google.common.base.Preconditions; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileInputStream; @@ -59,7 +60,7 @@ import org.apache.hadoop.classification. * See {@link ShortCircuitRegistry} for more information on the communication protocol. */ @InterfaceAudience.Private -public class DfsClientShmManager { +public class DfsClientShmManager implements Closeable { private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class); /** @@ -225,6 +226,12 @@ public class DfsClientShmManager { Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException { while (true) { + if (closed) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": the DfsClientShmManager has been closed."); + } + return null; + } if (disabled) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": shared memory segment access is disabled."); @@ -374,6 +381,8 @@ public class DfsClientShmManager { } } + private boolean closed = false; + private final ReentrantLock lock = new ReentrantLock(); /** @@ -409,6 +418,10 @@ public class DfsClientShmManager { String clientName) throws IOException { lock.lock(); try { + if (closed) { + LOG.trace(this + ": the DfsClientShmManager isclosed."); + return null; + } EndpointShmManager shmManager = datanodes.get(datanode); if (shmManager == null) { shmManager = new EndpointShmManager(datanode); @@ -466,9 +479,32 @@ public class DfsClientShmManager { } } + /** + * Close the DfsClientShmManager. + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + if (closed) return; + closed = true; + } finally { + lock.unlock(); + } + // When closed, the domainSocketWatcher will issue callbacks that mark + // all the outstanding DfsClientShm segments as stale. + IOUtils.cleanup(LOG, domainSocketWatcher); + } + + @Override public String toString() { return String.format("ShortCircuitShmManager(%08x)", System.identityHashCode(this)); } + + @VisibleForTesting + public DomainSocketWatcher getDomainSocketWatcher() { + return domainSocketWatcher; + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1574787&r1=1574786&r2=1574787&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Thu Mar 6 07:57:19 2014 @@ -887,6 +887,7 @@ public class ShortCircuitCache implement /** * Close the cache and free all associated resources. */ + @Override public void close() { try { lock.lock(); @@ -911,6 +912,7 @@ public class ShortCircuitCache implement } finally { lock.unlock(); } + IOUtils.cleanup(LOG, shmManager); } @VisibleForTesting // ONLY for testing Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1574787&r1=1574786&r2=1574787&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Thu Mar 6 07:57:19 2014 @@ -376,4 +376,37 @@ public class TestBlockReaderFactory { Assert.assertEquals(null, cache.getDfsClientShmManager()); cluster.shutdown(); } + + /** + * Test shutting down the ShortCircuitCache while there are things in it. + */ + @Test + public void testShortCircuitCacheShutdown() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testShortCircuitCacheShutdown", sockDir); + conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); + Configuration serverConf = new Configuration(conf); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + cache.close(); + Assert.assertTrue(cache.getDfsClientShmManager(). + getDomainSocketWatcher().isClosed()); + cluster.shutdown(); + } }