spark git commit: [SPARK-22554][PYTHON] Add a config to control if PySpark should use daemon or not for workers

2017-11-19 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master b10837ab1 -> 57c5514de


[SPARK-22554][PYTHON] Add a config to control if PySpark should use daemon or 
not for workers

## What changes were proposed in this pull request?

This PR proposes to add a flag to control if PySpark should use daemon or not.

Actually, SparkR already has a flag for useDaemon:
https://github.com/apache/spark/blob/478fbc866fbfdb4439788583281863ecea14e8af/core/src/main/scala/org/apache/spark/api/r/RRunner.scala#L362

It'd be great if we have this flag too. It makes easier to debug Windows 
specific issue.

## How was this patch tested?

Manually tested.

Author: hyukjinkwon 

Closes #19782 from HyukjinKwon/use-daemon-flag.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57c5514d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57c5514d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57c5514d

Branch: refs/heads/master
Commit: 57c5514de9dba1c14e296f85fb13fef23ce8c73f
Parents: b10837a
Author: hyukjinkwon 
Authored: Mon Nov 20 13:34:06 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Nov 20 13:34:06 2017 +0900

--
 .../org/apache/spark/api/python/PythonWorkerFactory.scala | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/57c5514d/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index fc595ae..f53c617 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -38,7 +38,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
   // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This 
daemon currently
   // only works on UNIX-based systems now because it uses signals for child 
management, so we can
   // also fall back to launching workers (pyspark/worker.py) directly.
-  val useDaemon = !System.getProperty("os.name").startsWith("Windows")
+  val useDaemon = {
+val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
+
+// This flag is ignored on Windows as it's unable to fork.
+!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
+  }
 
   var daemon: Process = null
   val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22557][TEST] Use ThreadSignaler explicitly

2017-11-19 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master d54bfec2e -> b10837ab1


[SPARK-22557][TEST] Use ThreadSignaler explicitly

## What changes were proposed in this pull request?

ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark 
tests uses `ThreadSignaler` explicitly which has the same default behavior of 
interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce 
potential flakiness.

## How was this patch tested?

This is testsuite-only update. This should passes the Jenkins tests.

Author: Dongjoon Hyun 

Closes #19784 from dongjoon-hyun/use_thread_signaler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b10837ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b10837ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b10837ab

Branch: refs/heads/master
Commit: b10837ab1a7bef04bf7a2773b9e44ed9206643fe
Parents: d54bfec
Author: Dongjoon Hyun 
Authored: Mon Nov 20 13:32:01 2017 +0900
Committer: hyukjinkwon 
Committed: Mon Nov 20 13:32:01 2017 +0900

--
 .../test/scala/org/apache/spark/DistributedSuite.scala|  7 +--
 core/src/test/scala/org/apache/spark/DriverSuite.scala|  5 -
 core/src/test/scala/org/apache/spark/UnpersistSuite.scala |  8 ++--
 .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala  |  9 -
 .../scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala |  5 -
 .../org/apache/spark/scheduler/DAGSchedulerSuite.scala|  5 -
 .../OutputCommitCoordinatorIntegrationSuite.scala |  5 -
 .../org/apache/spark/storage/BlockManagerSuite.scala  | 10 --
 .../test/scala/org/apache/spark/util/EventLoopSuite.scala |  5 -
 .../execution/streaming/ProcessingTimeExecutorSuite.scala |  8 +---
 .../scala/org/apache/spark/sql/streaming/StreamTest.scala |  2 ++
 .../org/apache/spark/sql/hive/SparkSubmitTestUtils.scala  |  5 -
 .../scala/org/apache/spark/streaming/ReceiverSuite.scala  |  5 +++--
 .../apache/spark/streaming/StreamingContextSuite.scala|  5 +++--
 .../spark/streaming/receiver/BlockGeneratorSuite.scala|  7 ---
 15 files changed, 68 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DistributedSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index f800561..ea9f6d2 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.scalatest.Matchers
-import org.scalatest.concurrent.TimeLimits._
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Millis, Span}
 
 import org.apache.spark.security.EncryptionFunSuite
@@ -30,7 +30,10 @@ class NotSerializableExn(val notSer: NotSerializableClass) 
extends Throwable() {
 
 
 class DistributedSuite extends SparkFunSuite with Matchers with 
LocalSparkContext
-  with EncryptionFunSuite {
+  with EncryptionFunSuite with TimeLimits {
+
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
 
   val clusterUrl = "local-cluster[2,1,1024]"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10837ab/core/src/test/scala/org/apache/spark/DriverSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala 
b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index be80d27..962945e 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io.File
 
-import org.scalatest.concurrent.TimeLimits
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.prop.TableDrivenPropertyChecks._
 import org.scalatest.time.SpanSugar._
 
@@ -27,6 +27,9 @@ import org.apache.spark.util.Utils
 
 class DriverSuite extends SparkFunSuite with TimeLimits {
 
+  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
   ignore("driver should exit after finishing without cleanup (SPARK-530)") {
 val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
 val masters = Table("master", "local", "local-cluster[2,1,1024]")