[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-22 Thread bersprockets
Github user bersprockets closed the pull request at:

https://github.com/apache/spark/pull/20519


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167424443
  
--- Diff: python/pyspark/daemon.py ---
@@ -177,4 +183,25 @@ def handle_sigterm(*args):
 
 
 if __name__ == '__main__':
-manager()
--- End diff --

I think:


https://github.com/apache/spark/blob/87ffe7adddf517541aac0d1e8536b02ad8881606/python/test_coverage/coverage_daemon.py#L45

should be updated too BTW.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167095485
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
--- End diff --

>but it's generally a good idea to call setReuseAddress(true).

Do I want to do this since I am asking for some available port rather than 
binding to a known port?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167028262
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
--- End diff --

Oh, and this one can also be a `val` instantiated outside the try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167018107
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
+// generate an 'auth token' for the daemon to pass back to us. 
This will
+// allow us to confirm that the we are truly communicating with 
the newly
+// launched daemon
+val expectedAuthToken = (new Random()).nextInt()
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
+workerEnv.put("PYSPARK_DAEMON_TOKEN", expectedAuthToken.toString)
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+var socketToDaemon: Socket = null
--- End diff --

This one can be a `val ... = serverSocket.accept()`, and you don't need the 
null check in the finally for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167015747
  
--- Diff: python/pyspark/daemon.py ---
@@ -177,4 +183,24 @@ def handle_sigterm(*args):
 
 
 if __name__ == '__main__':
-manager()
+if len(sys.argv) < 2:
+print >> sys.stderr, "No parent port number specified"
+sys.exit(1)
+try:
+parent_port = int(sys.argv[1])
+except ValueError:
+print >> sys.stderr, "Non-numeric port number specified:", 
sys.argv[1]
--- End diff --

I don't think this syntax works with python 3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167014593
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
--- End diff --

not sure it makes a difference here, but it's generally a good idea to call 
`setReuseAddress(true)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167013866
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 return
   }
 
+  var serverSocket: ServerSocket = null
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
+// generate an 'auth token' for the daemon to pass back to us. 
This will
+// allow us to confirm that the we are truly communicating with 
the newly
+// launched daemon
+val expectedAuthToken = (new Random()).nextInt()
--- End diff --

probably ok, but I'd use `SecureRandom`, just in case. Maybe even use a 
long for the token. Also the parentheses around `new` aren't necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-08 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r167015476
  
--- Diff: python/pyspark/daemon.py ---
@@ -79,11 +79,17 @@ def manager():
 listen_sock.listen(max(1024, SOMAXCONN))
 listen_host, listen_port = listen_sock.getsockname()
 
-# re-open stdin/stdout in 'wb' mode
+socket_to_parent = socket.socket(AF_INET, SOCK_STREAM)
+socket_to_parent.connect(('127.0.0.1', parent_port))
+outfile = socket_to_parent.makefile(mode="wb")
+write_int(token, outfile)
+write_int(listen_port, outfile)
+outfile.flush()
+outfile.close()
+socket_to_parent.close()
+
+# re-open stdin in 'wb' mode
--- End diff --

wb?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166422848
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

>Not to kill the action, but just to make sure sockets are closed when they 
should.

Oh, of course, got it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166422566
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

Yeah, that's a reasonable way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166422170
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

>You can add an auth token and expose it to the child in an env variable

How about a randomly generated integer passed to the daemon in the 
environment and then expected back as the first bit of data in the connection?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166421329
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

Not to kill the action, but just to make sure sockets are closed when they 
should.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166420442
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

>You also probably want some try..finally error handling to close the 
server socket, at least.

I take this as we don't want some serverSocket.close() error to kill the 
user's action. That makes sense to me, just checking.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20519#discussion_r166415978
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   }
 
   try {
+// get a server socket so that the launched daemon can tell us its 
server port
+val serverSocket = new ServerSocket(0, 0, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
+val serverPort = serverSocket.getLocalPort
+
 // Create and start the daemon
-val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule))
+val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
daemonModule,
+  serverPort.toString))
 val workerEnv = pb.environment()
 workerEnv.putAll(envVars.asJava)
 workerEnv.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
 workerEnv.put("PYTHONUNBUFFERED", "YES")
 daemon = pb.start()
 
+// get the local port of the daemon's server socket,
+// but don't wait forever for the daemon to connect
+serverSocket.setSoTimeout(1)
+val socketToDaemon = serverSocket.accept()
--- End diff --

So, the problem with TCP sockets is: what if some other malicious process 
connects here instead of the process you expect to?

You can add an auth token and expose it to the child in an env variable, 
but that complicates the code a bit more. Sometimes I wish we could create 
proper pipes between processes in Java...

(You also probably want some `try..finally` error handling to close the 
server socket, at least.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...

2018-02-06 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/20519

[Spark-23240][python] Don't let python site customizations interfere with 
communication between PythonWorkerFactory and daemon.py

## What changes were proposed in this pull request?

Use a socket, instead of stdout, as the inter-process communication method 
between PythonWorkerFactory and a newly launched daemon.py to avoid the issue 
described in the Jira (a case where python stdout can be polluted by extraneous 
messages).

## How was this patch tested?

- python/run-tests --modules "pyspark-core,pyspark-sql,pyspark-streaming"
- python/pyspark/tests.py
- all sbt tests
- selected queries on a Spark cluster that uses a python installation with 
site customizations

Note: The two python test scripts and cluster test were run using Python 
2.7 only.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-23240_prop3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20519


commit e34f987712e1e607604cebafb38d31014902da72
Author: Bruce Robbins 
Date:   2018-02-06T04:31:55Z

Don't let python site customizations interfere with communication between 
PythonWorkerFactory and daemon.py




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org