[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r213060868 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) --- End diff -- ok I'm doing this -- SPARK-25253, will open a pr shortly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r213050049 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) --- End diff -- Thanks for catching this, yea I agree it would be better to move the authentication before recognising functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r213043068 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) --- End diff -- (I'd also like to do some refactoring of the socket setup code in python, and that can go further if we do authenticaion first here) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r213032992 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) --- End diff -- why is authentication the first thing which happens on this connection? I don't think anything bad can happen in this case, but it just makes it more likely we leave a security hole here later on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r212762076 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) + input.readInt() match { +case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION => + // The barrier() function may wait infinitely, socket shall not timeout + // before the function finishes. + sock.setSoTimeout(0) + barrierAndServe(sock) + +case _ => + val out = new DataOutputStream(new BufferedOutputStream( +sock.getOutputStream)) + writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out) + } +} catch { + case e: SocketException if e.getMessage.contains("Socket closed") => +// It is possible that the ServerSocket is not closed, but the native socket +// has already been closed, we shall catch and silently ignore this case. +} finally { + if (sock != null) { +sock.close() + } +} + } +} + }.start() +} +val secret = if (isBarrier) { + authHelper.secret +} else { + "" +} +// Close ServerSocket on task completion. +serverSocket.foreach { server => + context.addTaskCompletionListener(_ => server.close()) --- End diff -- Addressed in https://github.com/apache/spark/pull/9 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r212755510 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) + input.readInt() match { +case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION => + // The barrier() function may wait infinitely, socket shall not timeout + // before the function finishes. + sock.setSoTimeout(0) + barrierAndServe(sock) + +case _ => + val out = new DataOutputStream(new BufferedOutputStream( +sock.getOutputStream)) + writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out) + } +} catch { + case e: SocketException if e.getMessage.contains("Socket closed") => +// It is possible that the ServerSocket is not closed, but the native socket +// has already been closed, we shall catch and silently ignore this case. +} finally { + if (sock != null) { +sock.close() + } +} + } +} + }.start() +} +val secret = if (isBarrier) { + authHelper.secret +} else { + "" +} +// Close ServerSocket on task completion. +serverSocket.foreach { server => + context.addTaskCompletionListener(_ => server.close()) --- End diff -- This is failing the Scala 2.12 build ``` [error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition, [error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext [error] and method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext [error] match argument types (org.apache.spark.TaskContext => Unit) [error] context.addTaskCompletionListener(_ => server.close()) [error] ^ [error] one error found [error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s] ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r212168597 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,143 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +BARRIER_FUNCTION = 1 + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. + +This is copied from context.py, while modified the message protocol. --- End diff -- It would be nicer if we can deduplciate it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22085 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211683369 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +write_with_length("run".encode("utf-8"), sockfile) +sockfile.flush() +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: --- End diff -- IIUC reuse python worker just means we start a python worker from a daemon thread, it shall not affect the input/output files related to worker.py. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211460405 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -20,15 +20,16 @@ package org.apache.spark.api.python import java.io._ import java.net._ import java.nio.charset.StandardCharsets +import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import org.apache.spark._ +import org.apache.spark.{SparkException, _} import org.apache.spark.internal.Logging +import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ - --- End diff -- tiny nit: I would remove this one back while addressing other comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211360719 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +write_with_length("run".encode("utf-8"), sockfile) +sockfile.flush() +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: --- End diff -- Q: Does it handle python worker reuse? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211356245 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): --- End diff -- Should document how this is different from the one in `context.py`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211359959 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -381,6 +465,20 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } } } + + def writeUTF(str: String, dataOut: DataOutputStream) { +val bytes = str.getBytes(StandardCharsets.UTF_8) --- End diff -- nit: `UTF_8` or always use `StandardCharsets` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211358615 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) + val cmdString = readUtf8(sock) + if (cmdString.equals("run")) { --- End diff -- If we do not expect any other command from the socket, we should throw an exception --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211356840 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -76,6 +77,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // TODO: support accumulator in multiple UDF protected val accumulator = funcs.head.funcs.head.accumulator + // Expose a ServerSocket to support method calls via socket from Python side. + private[spark] var serverSocket: Option[ServerSocket] = None + + // Authentication helper used when serving method calls via socket from Python side. + private lazy val authHelper = { +val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) --- End diff -- When `SparkEnv.get` returns null? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211355022 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -20,15 +20,16 @@ package org.apache.spark.api.python import java.io._ import java.net._ import java.nio.charset.StandardCharsets +import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import org.apache.spark._ +import org.apache.spark.{SparkException, _} --- End diff -- `_` should include `SparkException` already --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211359028 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) + val cmdString = readUtf8(sock) + if (cmdString.equals("run")) { +sock.setSoTimeout(0) +barrierAndServe(sock) + } +} catch { + case _: SocketException => --- End diff -- Is the the timeout exception? I don't see any exception that we could silently ignore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211358743 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + sock.setSoTimeout(1) --- End diff -- Should add a comment about this timeout. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211357983 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost"))) --- End diff -- minor: useful to add `/* port */` and `/* backlog */` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211182337 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,124 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, port, secret): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._port = port +cls._secret = secret + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._port is None or self._secret is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +_load_from_socket(self._port, self._secret) + +def getTaskInfos(self): --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r210148558 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Not yet. So I asked to hold it back for now since another gateway here looks the last choice, and was wondering if we can avoid to target 2.4.0. If this blocks, please go ahead. Will check it later on this weekends. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209974729 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- The major issue here is that we want to make the `barrier()` call blocking, the task shall wait until timeout or succeeded, do we have other ways to achieve this goal other than current approach here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209862546 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass --- End diff -- I'm okay as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209862336 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass --- End diff -- Ah, this is called in `_getOrCreate`. Sorry, I rushed to read. In this case, frankly I think we can remove this since that's the default constructor injected by Python or monkey patch to disallow the initialization (like we did for `ImageSchema`) but I guess we don't necessarily be super clever on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209853276 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass --- End diff -- This just follows `TaskContext.__init__()`, shall we update both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209846054 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, ctx): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._barrierContext = ctx + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +self._barrierContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call getTaskInfos() before initialize " + +"BarrierTaskContext.") +else: +java_list = self._barrierContext.getTaskInfos() +return [BarrierTaskInfo(h) for h in java_list] + + +class BarrierTaskInfo(object): +""" +.. note:: Experimental + +Carries all task infos of a barrier task. + +.. versionadded:: 2.4.0 +""" + +def __init__(self, info): +self.address = info.address --- End diff -- * should be `info.address()` * better to rename `info` to `jobj` to make it clear this is from Java --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209846015 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, ctx): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._barrierContext = ctx + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +self._barrierContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._barrierContext is None: +raise Exception("Not supported to call getTaskInfos() before initialize " + +"BarrierTaskContext.") +else: +java_list = self._barrierContext.getTaskInfos() +return [BarrierTaskInfo(h) for h in java_list] + + +class BarrierTaskInfo(object): +""" +.. note:: Experimental + +Carries all task infos of a barrier task. + +.. versionadded:: 2.4.0 +""" + +def __init__(self, info): +self.address = info.address --- End diff -- * should be `info.address` * better to rename `info` to `jobj` to make it clear this is from Java --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209835246 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass --- End diff -- I would throw an exception BTW if this method should rather be banned --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209834646 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- BTW, this is a design change. We should probably update https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals too .. is this really required to open a gate there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209833972 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Mainly the reason is about resource usage, unusual access pattern via Py4J at worker side, and the possibility of allowing JVM access within Python worker. It pretty much looks an overkill to launch a Java gateway to allow access to call a function assuming from https://github.com/apache/spark/pull/22085#discussion_r209490553. This pattern sounds pretty unusual - such cases, we usually send the data manually and read it in Python side, for instance `TaskContext`. Now, it opens a gateway for each worker if I am not mistaken. I was thinking if we can avoid this. Can you elaborate why this is required and necessary? I haven't got enough time to look into this so was thinking about taking a look on this weekends. This also now opens a possibility for an JVM access from worker side via `BarrierTaskContext`. For instance, I believe we can hack and access to JVM inside of UDFs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209830941 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- @HyukjinKwon Could you elaborate your concerns? Is it because resource usage or security? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209491244 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- If this should necessarily target 2.4.0, don't block by me since it's a new feature and probably we could consider another approach later but if we can avoid, I would suggest to avoid for now. Let me try to track the design doc and changes about this. I think I need more time to check why it happened like this and if there's another way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209491060 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Yea, I read and understood if this is only initialised when the context is a `BarrierTaskContext` but this is super weird we start another Java gateway here. If it's a hard requirement, then I suspect the design issue. Should this be targeted to 2.4.0, @mengxr? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209490553 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- We have to port `BarrierTaskContext` from java to python side, otherwise there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, of course, the JavaGateway is only initiated when the context is a `BarrierTaskContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209476191 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- Wait wait.. you guys sure have another Java gateway for each worker? (or did I rush to read this code?) Can you elaborate why this is needed? We should avoid this unless it's super required or necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473946 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), --- End diff -- Leave a TODO here. We do not have requests from Java to Python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473919 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +96,33 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._javaContext is None: +raise Exception("Not supported to call barrier() inside a non-barrier task.") +else: +self._javaContext.barrier() + +def getTaskInfos(self): +""" +.. note:: Experimental + +Returns the all task infos in this barrier stage, the task infos are ordered by +partitionId. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._javaContext is None: +raise Exception("Not supported to call getTaskInfos() inside a non-barrier task.") +else: +java_list = self._javaContext.getTaskInfos() +return [h for h in java_list] --- End diff -- Create `BarrierTaskInfo` class and wrap it over Java object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209473887 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +96,33 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + +def barrier(self): --- End diff -- Create `BarrierTaskContext` that extends `TaskContext` and then move those two methods there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209464591 --- Diff: python/pyspark/worker.py --- @@ -275,6 +280,10 @@ def main(infile, outfile): shuffle.DiskBytesSpilled = 0 _accumulatorRegistry.clear() +if isBarrier: +paras = GatewayParameters(port=boundPort, auth_token=secret, auto_convert=True) --- End diff -- Maybe `params` instead of `paras`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209464569 --- Diff: python/pyspark/worker.py --- @@ -261,6 +263,9 @@ def main(infile, outfile): # initialize global state taskContext = TaskContext._getOrCreate() +isBarrier = read_bool(infile) --- End diff -- Add a comment indicating the following 3 inputs are only for barrier task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209464679 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) +} else { + None +} +gatewayServer.map(_.start()) +gatewayServer.foreach { server => + context.addTaskCompletionListener(_ => server.shutdown()) +} +val boundPort: Int = gatewayServer.map(_.getListeningPort).getOrElse(0) +if (boundPort == -1) { + val message = "GatewayServer to port BarrierTaskContext failed to bind to Java side." + logError(message) + throw new SparkException(message) +} else { + logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.") +} // Write out the TaskContextInfo --- End diff -- This comment should be moved too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209464515 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) +} else { + None +} +gatewayServer.map(_.start()) +gatewayServer.foreach { server => + context.addTaskCompletionListener(_ => server.shutdown()) +} +val boundPort: Int = gatewayServer.map(_.getListeningPort).getOrElse(0) +if (boundPort == -1) { + val message = "GatewayServer to port BarrierTaskContext failed to bind to Java side." + logError(message) + throw new SparkException(message) +} else { + logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.") --- End diff -- When `isBarrier` is false, I think we don't need show this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209464621 --- Diff: python/pyspark/taskcontext.py --- @@ -29,6 +29,7 @@ class TaskContext(object): """ _taskContext = None +_javaContext = None --- End diff -- `_barrierContext`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22085 [SPARK-25095][PySpark] Python support for BarrierTaskContext ## What changes were proposed in this pull request? Add method `barrier()` and `getTaskInfos()` in python TaskContext, these two methods are only allowed for barrier tasks. ## How was this patch tested? TBD You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark python.barrier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22085 commit 7b488299709f715d344e5c38956577f31718ab34 Author: Xingbo Jiang Date: 2018-08-12T16:04:20Z implement python barrier taskcontext --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org