spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.5 b1fcefca6 -> 7900d192e [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong ZhuCloses #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192 Branch: refs/heads/branch-1.5 Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764 Parents: b1fcefc Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:53 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 06976f8..3159ef7 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -57,10 +57,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation +
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.4 1df3e8230 -> f5af299ab [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong ZhuCloses #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5af299a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5af299a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5af299a Branch: refs/heads/branch-1.4 Commit: f5af299ab06f654e000c99917c703f989bffaa43 Parents: 1df3e82 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:35:33 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index ca5624a..94d581f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 8c51e6b..1fb6364 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -58,6 +60,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the +
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/branch-1.6 7e7f2627f -> 0df6beccc [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong ZhuCloses #9978 from zsxwing/cached-threadpool. (cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc Branch: refs/heads/branch-1.6 Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d Parents: 7e7f262 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:36 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5328344..f9fbe2f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation +
spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task
Repository: spark Updated Branches: refs/heads/master 068b6438d -> d3ef69332 [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`. Author: Shixiong ZhuCloses #9978 from zsxwing/cached-threadpool. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933 Branch: refs/heads/master Commit: d3ef693325f91a1ed340c9756c81244a80398eb2 Parents: 068b643 Author: Shixiong Zhu Authored: Wed Nov 25 23:31:21 2015 -0800 Committer: Shixiong Zhu Committed: Wed Nov 25 23:31:21 2015 -0800 -- .../org/apache/spark/util/ThreadUtils.scala | 14 -- .../apache/spark/util/ThreadUtilsSuite.scala| 45 2 files changed, 56 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5328344..f9fbe2f 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -56,10 +56,18 @@ private[spark] object ThreadUtils { * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. */ - def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = { + def newDaemonCachedThreadPool( + prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) -new ThreadPoolExecutor( - 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory) +val threadPool = new ThreadPoolExecutor( + maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks + maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) +threadPool.allowCoreThreadTimeOut(true) +threadPool } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 620e4de..92ae038 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.Random +import org.scalatest.concurrent.Eventually._ + import org.apache.spark.SparkFunSuite class ThreadUtilsSuite extends SparkFunSuite { @@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonCachedThreadPool") { +val maxThreadNumber = 10 +val startThreadsLatch = new CountDownLatch(maxThreadNumber) +val latch = new CountDownLatch(1) +val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool( + "ThreadUtilsSuite-newDaemonCachedThreadPool", + maxThreadNumber, + keepAliveSeconds = 2) +try { + for (_ <- 1 to maxThreadNumber) { +cachedThreadPool.execute(new Runnable { + override def run(): Unit = { +startThreadsLatch.countDown() +latch.await(10, TimeUnit.SECONDS) + } +}) + } + startThreadsLatch.await(10, TimeUnit.SECONDS) + assert(cachedThreadPool.getActiveCount === maxThreadNumber) + assert(cachedThreadPool.getQueue.size === 0) + + // Submit a new task and it should be put into the queue since the thread number reaches the + // limitation + cachedThreadPool.execute(new Runnable { +override def run(): Unit = { + latch.await(10, TimeUnit.SECONDS) +} +