[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20151


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20151#discussion_r160864049
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   import PythonWorkerFactory._
 
-  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon
-  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. 
This daemon currently
-  // only works on UNIX-based systems now because it uses signals for 
child management, so we can
-  // also fall back to launching workers (pyspark/worker.py) directly.
+  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
+  // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
+  // currently only works on UNIX-based systems now because it uses 
signals for child management,
+  // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
   val useDaemon = {
 val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
 
 // This flag is ignored on Windows as it's unable to fork.
 !System.getProperty("os.name").startsWith("Windows") && 
useDaemonEnabled
   }
 
+  // WARN: Both configurations, 'spark.python.daemon.module' and 
'spark.python.worker.module' are
+  // for very advanced users and they are experimental. This should be 
considered
+  // as expert-only option, and shouldn't be used before knowing what it 
means exactly.
+
+  // This configuration indicates the module to run the daemon to execute 
its Python workers.
+  val daemonModule = 
SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
--- End diff --

Hm, actually we could check like .. if it's empty string too. I wrote 
"shouldn't be used before knowing what it means exactly." above. So, I think 
it's fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20151#discussion_r160817581
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   import PythonWorkerFactory._
 
-  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon
-  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. 
This daemon currently
-  // only works on UNIX-based systems now because it uses signals for 
child management, so we can
-  // also fall back to launching workers (pyspark/worker.py) directly.
+  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
+  // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
+  // currently only works on UNIX-based systems now because it uses 
signals for child management,
+  // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
   val useDaemon = {
 val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
 
 // This flag is ignored on Windows as it's unable to fork.
 !System.getProperty("os.name").startsWith("Windows") && 
useDaemonEnabled
   }
 
+  // WARN: Both configurations, 'spark.python.daemon.module' and 
'spark.python.worker.module' are
+  // for very advanced users and they are experimental. This should be 
considered
+  // as expert-only option, and shouldn't be used before knowing what it 
means exactly.
+
+  // This configuration indicates the module to run the daemon to execute 
its Python workers.
+  val daemonModule = 
SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
--- End diff --

Do we need to restrict the module's package to only allow something like 
`pyspark.*`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20151#discussion_r160661749
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   import PythonWorkerFactory._
 
-  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon
-  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. 
This daemon currently
-  // only works on UNIX-based systems now because it uses signals for 
child management, so we can
-  // also fall back to launching workers (pyspark/worker.py) directly.
+  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
+  // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
+  // currently only works on UNIX-based systems now because it uses 
signals for child management,
+  // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
   val useDaemon = {
 val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
 
 // This flag is ignored on Windows as it's unable to fork.
 !System.getProperty("os.name").startsWith("Windows") && 
useDaemonEnabled
   }
 
+  // WARN: Both configurations, 'spark.python.daemon.module' and 
'spark.python.worker.module' are
+  // for very advanced users and they are experimental. This should be 
considered
+  // as expert-only option, and shouldn't be used before knowing what it 
means exactly.
+
+  // This configuration indicates the module to run the daemon to execute 
its Python workers.
+  val daemonModule = 
SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
+logInfo(
+  s"Python daemon module in PySpark is set to [$value] in 
'spark.python.daemon.module', " +
+  "using this to start the daemon up. Note that this configuration 
only has an effect when " +
+  "'spark.python.use.daemon' is enabled and the platform is not 
Windows.")
--- End diff --

Just double checked it shows the log only when the configuration is 
explicitly set:

```
18/01/10 21:23:24 INFO PythonWorkerFactory: Python daemon module in PySpark 
is set to [pyspark.daemon] in
 'spark.python.daemon.module', using this to start the daemon up. Note that 
this configuration only has an 
effect when 'spark.python.use.daemon' is enabled and the platform is not 
Windows.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20151#discussion_r160021803
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -34,17 +34,25 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   import PythonWorkerFactory._
 
-  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon
-  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. 
This daemon currently
-  // only works on UNIX-based systems now because it uses signals for 
child management, so we can
-  // also fall back to launching workers (pyspark/worker.py) directly.
+  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
+  // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
+  // currently only works on UNIX-based systems now because it uses 
signals for child management,
+  // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
   val useDaemon = {
 val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
 
 // This flag is ignored on Windows as it's unable to fork.
 !System.getProperty("os.name").startsWith("Windows") && 
useDaemonEnabled
   }
 
+  // This configuration indicates the module to run the daemon to execute 
its Python workers.
+  val daemonModule = SparkEnv.get.conf.get("spark.python.daemon.module", 
"pyspark.daemon")
--- End diff --

Ah, yup that's true in general. But please let me stick to "module" here as 
that's what we execute (`python -m`) describes:

```
python --help
...
-m mod : run library module as a script (terminates option list)
...
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-04 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/20151#discussion_r159819670
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -34,17 +34,25 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   import PythonWorkerFactory._
 
-  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon
-  // (pyspark/daemon.py) and tell it to fork new workers for our tasks. 
This daemon currently
-  // only works on UNIX-based systems now because it uses signals for 
child management, so we can
-  // also fall back to launching workers (pyspark/worker.py) directly.
+  // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
+  // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
+  // currently only works on UNIX-based systems now because it uses 
signals for child management,
+  // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
   val useDaemon = {
 val useDaemonEnabled = 
SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
 
 // This flag is ignored on Windows as it's unable to fork.
 !System.getProperty("os.name").startsWith("Windows") && 
useDaemonEnabled
   }
 
+  // This configuration indicates the module to run the daemon to execute 
its Python workers.
+  val daemonModule = SparkEnv.get.conf.get("spark.python.daemon.module", 
"pyspark.daemon")
--- End diff --

generally, I thought we use the name "command" as what we call the thing to 
execute 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20151: [SPARK-22959][PYTHON] Configuration to select the...

2018-01-04 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/20151

[SPARK-22959][PYTHON] Configuration to select the modules for daemon and 
worker in PySpark

## What changes were proposed in this pull request?

We are now forced to use `pyspark/daemon.py` and `pyspark/worker.py` in 
PySpark.

This doesn't allow a custom modification for it. For example, it's 
sometimes hard to debug what happens inside Python worker processes.

This is actually related with SPARK-7721 too as somehow Coverage is unable 
to detect the coverage from `os.fork`. If we have some custom fixes to force 
the coverage, it works fine.

This is also related with SPARK-20368. This JIRA describes Sentry support 
which (roughly) needs some changes within worker side. 

With this configuration advanced users will be able to do a lot of 
pluggable workarounds and we can meet such potential needs in the future.

As an example, let's say if I configure the module `coverage_daemon` and 
had `coverage_daemon.py` in the python path:

```python
import os

from pyspark import daemon


if "COVERAGE_PROCESS_START" in os.environ:
from pyspark.worker import main

def _cov_wrapped(*args, **kwargs):
import coverage
cov = coverage.coverage(
config_file=os.environ["COVERAGE_PROCESS_START"])
cov.start()
try:
main(*args, **kwargs)
finally:
cov.stop()
cov.save()
daemon.worker_main = _cov_wrapped


if __name__ == '__main__':
daemon.manager()
```

More importantly, we can leave the main code intact but allow some 
workarounds.

## How was this patch tested?

Manually tested.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark configuration-daemon-worker

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20151


commit f74df4b566594152fa1efe1e3fb6033cbcf3993b
Author: hyukjinkwon 
Date:   2018-01-04T12:39:56Z

Configuration to select the modules for daemon and worker in PySpark




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org