Author: eli
Date: Sun Aug 12 19:37:34 2012
New Revision: 1372187

URL: http://svn.apache.org/viewvc?rev=1372187&view=rev
Log:
HDFS-3754. BlockSender doesn't shutdown ReadaheadPool threads. Contributed by 
Eli Collins

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1372187&r1=1372186&r2=1372187&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sun Aug 12 19:37:34 2012
@@ -161,6 +161,8 @@ Release 1.2.0 - unreleased
 
     MAPREDUCE-4036. Streaming TestUlimit fails on CentOS 6. (tucu)
 
+    HDFS-3754. BlockSender doesn't shutdown ReadaheadPool threads. (eli)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java?rev=1372187&r1=1372186&r2=1372187&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java
 Sun Aug 12 19:37:34 2012
@@ -60,13 +60,14 @@ public class ReadaheadPool {
   }
 
   private ReadaheadPool() {
+    final ThreadFactory backingFactory = Executors.defaultThreadFactory();
     pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, 
TimeUnit.SECONDS,
         new ArrayBlockingQueue<Runnable>(CAPACITY));
     pool.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardOldestPolicy());
     pool.setThreadFactory(new ThreadFactory() {
       @Override
       public Thread newThread(Runnable runnable) {
-        Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+        Thread thread = backingFactory.newThread(runnable);
         thread.setName(String.format("Readahead Thread #%d",
             THREAD_COUNTER.getAndIncrement()));
         thread.setDaemon(true);
@@ -246,4 +247,4 @@ public class ReadaheadPool {
           + ", off=" + off + ", len=" + len + "]";
     }
   }
-}
\ No newline at end of file
+}

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1372187&r1=1372186&r2=1372187&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 Sun Aug 12 19:37:34 2012
@@ -86,6 +86,7 @@ class BlockSender implements java.io.Clo
   // Cache-management related fields
   private final long readaheadLength;
   private boolean shouldDropCacheBehindRead;
+  private ReadaheadPool readaheadPool;
   private ReadaheadRequest curReadahead;
   private long lastCacheDropOffset;
   private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@@ -95,8 +96,6 @@ class BlockSender implements java.io.Clo
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
 
-  private static ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-  
   BlockSender(Block block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode) throws IOException {
@@ -117,6 +116,7 @@ class BlockSender implements java.io.Clo
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
       this.readaheadLength = datanode.getReadaheadLength();
+      this.readaheadPool = datanode.readaheadPool;
       this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
       
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1372187&r1=1372186&r2=1372187&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 Sun Aug 12 19:37:34 2012
@@ -107,6 +107,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -270,6 +271,8 @@ public class DataNode extends Configured
   public Server ipcServer;
 
   private SecureResources secureResources = null;
+
+  ReadaheadPool readaheadPool;
   
   /**
    * Current system time.
@@ -469,6 +472,8 @@ public class DataNode extends Configured
                reason + ".");
     }
 
+    readaheadPool = ReadaheadPool.getInstance();
+
     this.connectToDnViaHostname = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);


Reply via email to