[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets closed the pull request at: https://github.com/apache/spark/pull/20519 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167424443 --- Diff: python/pyspark/daemon.py --- @@ -177,4 +183,25 @@ def handle_sigterm(*args): if __name__ == '__main__': -manager() --- End diff -- I think: https://github.com/apache/spark/blob/87ffe7adddf517541aac0d1e8536b02ad8881606/python/test_coverage/coverage_daemon.py#L45 should be updated too BTW. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167095485 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) --- End diff -- >but it's generally a good idea to call setReuseAddress(true). Do I want to do this since I am asking for some available port rather than binding to a known port? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167028262 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) --- End diff -- Oh, and this one can also be a `val` instantiated outside the try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167018107 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + +// generate an 'auth token' for the daemon to pass back to us. This will +// allow us to confirm that the we are truly communicating with the newly +// launched daemon +val expectedAuthToken = (new Random()).nextInt() + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") +workerEnv.put("PYSPARK_DAEMON_TOKEN", expectedAuthToken.toString) daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +var socketToDaemon: Socket = null --- End diff -- This one can be a `val ... = serverSocket.accept()`, and you don't need the null check in the finally for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167015747 --- Diff: python/pyspark/daemon.py --- @@ -177,4 +183,24 @@ def handle_sigterm(*args): if __name__ == '__main__': -manager() +if len(sys.argv) < 2: +print >> sys.stderr, "No parent port number specified" +sys.exit(1) +try: +parent_port = int(sys.argv[1]) +except ValueError: +print >> sys.stderr, "Non-numeric port number specified:", sys.argv[1] --- End diff -- I don't think this syntax works with python 3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167014593 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) --- End diff -- not sure it makes a difference here, but it's generally a good idea to call `setReuseAddress(true)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167013866 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -180,18 +181,52 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String return } + var serverSocket: ServerSocket = null try { +// get a server socket so that the launched daemon can tell us its server port +serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + +// generate an 'auth token' for the daemon to pass back to us. This will +// allow us to confirm that the we are truly communicating with the newly +// launched daemon +val expectedAuthToken = (new Random()).nextInt() --- End diff -- probably ok, but I'd use `SecureRandom`, just in case. Maybe even use a long for the token. Also the parentheses around `new` aren't necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r167015476 --- Diff: python/pyspark/daemon.py --- @@ -79,11 +79,17 @@ def manager(): listen_sock.listen(max(1024, SOMAXCONN)) listen_host, listen_port = listen_sock.getsockname() -# re-open stdin/stdout in 'wb' mode +socket_to_parent = socket.socket(AF_INET, SOCK_STREAM) +socket_to_parent.connect(('127.0.0.1', parent_port)) +outfile = socket_to_parent.makefile(mode="wb") +write_int(token, outfile) +write_int(listen_port, outfile) +outfile.flush() +outfile.close() +socket_to_parent.close() + +# re-open stdin in 'wb' mode --- End diff -- wb? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166422848 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- >Not to kill the action, but just to make sure sockets are closed when they should. Oh, of course, got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166422566 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- Yeah, that's a reasonable way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166422170 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- >You can add an auth token and expose it to the child in an env variable How about a randomly generated integer passed to the daemon in the environment and then expected back as the first bit of data in the connection? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166421329 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- Not to kill the action, but just to make sure sockets are closed when they should. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166420442 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- >You also probably want some try..finally error handling to close the server socket, at least. I take this as we don't want some serverSocket.close() error to kill the user's action. That makes sense to me, just checking. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20519#discussion_r166415978 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -181,17 +181,33 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } try { +// get a server socket so that the launched daemon can tell us its server port +val serverSocket = new ServerSocket(0, 0, InetAddress.getByAddress(Array(127, 0, 0, 1))) +val serverPort = serverSocket.getLocalPort + // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule, + serverPort.toString)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") daemon = pb.start() +// get the local port of the daemon's server socket, +// but don't wait forever for the daemon to connect +serverSocket.setSoTimeout(1) +val socketToDaemon = serverSocket.accept() --- End diff -- So, the problem with TCP sockets is: what if some other malicious process connects here instead of the process you expect to? You can add an auth token and expose it to the child in an env variable, but that complicates the code a bit more. Sometimes I wish we could create proper pipes between processes in Java... (You also probably want some `try..finally` error handling to close the server socket, at least.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20519: [Spark-23240][python] Don't let python site custo...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/20519 [Spark-23240][python] Don't let python site customizations interfere with communication between PythonWorkerFactory and daemon.py ## What changes were proposed in this pull request? Use a socket, instead of stdout, as the inter-process communication method between PythonWorkerFactory and a newly launched daemon.py to avoid the issue described in the Jira (a case where python stdout can be polluted by extraneous messages). ## How was this patch tested? - python/run-tests --modules "pyspark-core,pyspark-sql,pyspark-streaming" - python/pyspark/tests.py - all sbt tests - selected queries on a Spark cluster that uses a python installation with site customizations Note: The two python test scripts and cluster test were run using Python 2.7 only. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-23240_prop3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20519.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20519 commit e34f987712e1e607604cebafb38d31014902da72 Author: Bruce Robbins Date: 2018-02-06T04:31:55Z Don't let python site customizations interfere with communication between PythonWorkerFactory and daemon.py --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org