spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b1fcefca6 -> 7900d192e


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192

Branch: refs/heads/branch-1.5
Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764
Parents: b1fcefc
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:53 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 06976f8..3159ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 1df3e8230 -> f5af299ab


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5af299a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5af299a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5af299a

Branch: refs/heads/branch-1.4
Commit: f5af299ab06f654e000c99917c703f989bffaa43
Parents: 1df3e82
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:35:33 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index ca5624a..94d581f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 8c51e6b..1fb6364 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -58,6 +60,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+ 

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7e7f2627f -> 0df6beccc


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc

Branch: refs/heads/branch-1.6
Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d
Parents: 7e7f262
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:36 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 068b6438d -> d3ef69332


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu 

Closes #9978 from zsxwing/cached-threadpool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933

Branch: refs/heads/master
Commit: d3ef693325f91a1ed340c9756c81244a80398eb2
Parents: 068b643
Author: Shixiong Zhu 
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Nov 25 23:31:21 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {
+  latch.await(10, TimeUnit.SECONDS)
+}
+