Author: eli Date: Sun Aug 12 19:37:34 2012 New Revision: 1372187 URL: http://svn.apache.org/viewvc?rev=1372187&view=rev Log: HDFS-3754. BlockSender doesn't shutdown ReadaheadPool threads. Contributed by Eli Collins
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1372187&r1=1372186&r2=1372187&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Sun Aug 12 19:37:34 2012 @@ -161,6 +161,8 @@ Release 1.2.0 - unreleased MAPREDUCE-4036. Streaming TestUlimit fails on CentOS 6. (tucu) + HDFS-3754. BlockSender doesn't shutdown ReadaheadPool threads. (eli) + Release 1.1.0 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java?rev=1372187&r1=1372186&r2=1372187&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/ReadaheadPool.java Sun Aug 12 19:37:34 2012 @@ -60,13 +60,14 @@ public class ReadaheadPool { } private ReadaheadPool() { + final ThreadFactory backingFactory = Executors.defaultThreadFactory(); pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); pool.setThreadFactory(new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { - Thread thread = Executors.defaultThreadFactory().newThread(runnable); + Thread thread = backingFactory.newThread(runnable); thread.setName(String.format("Readahead Thread #%d", THREAD_COUNTER.getAndIncrement())); thread.setDaemon(true); @@ -246,4 +247,4 @@ public class ReadaheadPool { + ", off=" + off + ", len=" + len + "]"; } } -} \ No newline at end of file +} Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1372187&r1=1372186&r2=1372187&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Sun Aug 12 19:37:34 2012 @@ -86,6 +86,7 @@ class BlockSender implements java.io.Clo // Cache-management related fields private final long readaheadLength; private boolean shouldDropCacheBehindRead; + private ReadaheadPool readaheadPool; private ReadaheadRequest curReadahead; private long lastCacheDropOffset; private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB @@ -95,8 +96,6 @@ class BlockSender implements java.io.Clo */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; - private static ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); - BlockSender(Block block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode) throws IOException { @@ -117,6 +116,7 @@ class BlockSender implements java.io.Clo this.transferToAllowed = datanode.transferToAllowed; this.clientTraceFmt = clientTraceFmt; this.readaheadLength = datanode.getReadaheadLength(); + this.readaheadPool = datanode.readaheadPool; this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1372187&r1=1372186&r2=1372187&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sun Aug 12 19:37:34 2012 @@ -107,6 +107,7 @@ import org.apache.hadoop.hdfs.web.WebHdf import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -270,6 +271,8 @@ public class DataNode extends Configured public Server ipcServer; private SecureResources secureResources = null; + + ReadaheadPool readaheadPool; /** * Current system time. @@ -469,6 +472,8 @@ public class DataNode extends Configured reason + "."); } + readaheadPool = ReadaheadPool.getInstance(); + this.connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);