[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r213060868
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
--- End diff --

ok I'm doing this -- SPARK-25253, will open a pr shortly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r213050049
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
--- End diff --

Thanks for catching this, yea I agree it would be better to move the 
authentication before recognising functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r213043068
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
--- End diff --

(I'd also like to do some refactoring of the socket setup code in python, 
and that can go further if we do authenticaion first here)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r213032992
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
--- End diff --

why is authentication the first thing which happens on this connection?  I 
don't think anything bad can happen in this case, but it just makes it more 
likely we leave a security hole here later on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-24 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r212762076
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
+  input.readInt() match {
+case 
BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
+  // The barrier() function may wait infinitely, 
socket shall not timeout
+  // before the function finishes.
+  sock.setSoTimeout(0)
+  barrierAndServe(sock)
+
+case _ =>
+  val out = new DataOutputStream(new 
BufferedOutputStream(
+sock.getOutputStream))
+  
writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
+  }
+} catch {
+  case e: SocketException if e.getMessage.contains("Socket 
closed") =>
+// It is possible that the ServerSocket is not closed, 
but the native socket
+// has already been closed, we shall catch and 
silently ignore this case.
+} finally {
+  if (sock != null) {
+sock.close()
+  }
+}
+  }
+}
+  }.start()
+}
+val secret = if (isBarrier) {
+  authHelper.secret
+} else {
+  ""
+}
+// Close ServerSocket on task completion.
+serverSocket.foreach { server =>
+  context.addTaskCompletionListener(_ => server.close())
--- End diff --

Addressed in https://github.com/apache/spark/pull/9


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-24 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r212755510
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
+  input.readInt() match {
+case 
BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
+  // The barrier() function may wait infinitely, 
socket shall not timeout
+  // before the function finishes.
+  sock.setSoTimeout(0)
+  barrierAndServe(sock)
+
+case _ =>
+  val out = new DataOutputStream(new 
BufferedOutputStream(
+sock.getOutputStream))
+  
writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
+  }
+} catch {
+  case e: SocketException if e.getMessage.contains("Socket 
closed") =>
+// It is possible that the ServerSocket is not closed, 
but the native socket
+// has already been closed, we shall catch and 
silently ignore this case.
+} finally {
+  if (sock != null) {
+sock.close()
+  }
+}
+  }
+}
+  }.start()
+}
+val secret = if (isBarrier) {
+  authHelper.secret
+} else {
+  ""
+}
+// Close ServerSocket on task completion.
+serverSocket.foreach { server =>
+  context.addTaskCompletionListener(_ => server.close())
--- End diff --

This is failing the Scala 2.12 build

```
[error] 
/Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242:
 ambiguous reference to overloaded definition,
[error] both method addTaskCompletionListener in class TaskContext of type 
[U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
[error] and  method addTaskCompletionListener in class TaskContext of type 
(listener: 
org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] match argument types (org.apache.spark.TaskContext => Unit)
[error]   context.addTaskCompletionListener(_ => server.close())
[error]   ^
[error] one error found
[error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r212168597
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,143 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+BARRIER_FUNCTION = 1
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+
+This is copied from context.py, while modified the message protocol.
--- End diff --

It would be nicer if we can deduplciate it later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22085


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211683369
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+write_with_length("run".encode("utf-8"), sockfile)
+sockfile.flush()
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
--- End diff --

IIUC reuse python worker just means we start a python worker from a daemon 
thread, it shall not affect the input/output files related to worker.py.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211460405
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -20,15 +20,16 @@ package org.apache.spark.api.python
 import java.io._
 import java.net._
 import java.nio.charset.StandardCharsets
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark._
+import org.apache.spark.{SparkException, _}
 import org.apache.spark.internal.Logging
+import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
 
-
--- End diff --

tiny nit: I would remove this one back while addressing other comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211360719
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+write_with_length("run".encode("utf-8"), sockfile)
+sockfile.flush()
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
--- End diff --

Q: Does it handle python worker reuse?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211356245
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
--- End diff --

Should document how this is different from the one in `context.py`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211359959
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -381,6 +465,20 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
   }
 }
   }
+
+  def writeUTF(str: String, dataOut: DataOutputStream) {
+val bytes = str.getBytes(StandardCharsets.UTF_8)
--- End diff --

nit: `UTF_8` or always use `StandardCharsets`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211358615
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
+  val cmdString = readUtf8(sock)
+  if (cmdString.equals("run")) {
--- End diff --

If we do not expect any other command from the socket, we should throw an 
exception


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211356840
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -76,6 +77,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   // TODO: support accumulator in multiple UDF
   protected val accumulator = funcs.head.funcs.head.accumulator
 
+  // Expose a ServerSocket to support method calls via socket from Python 
side.
+  private[spark] var serverSocket: Option[ServerSocket] = None
+
+  // Authentication helper used when serving method calls via socket from 
Python side.
+  private lazy val authHelper = {
+val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
--- End diff --

When `SparkEnv.get` returns null?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211355022
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -20,15 +20,16 @@ package org.apache.spark.api.python
 import java.io._
 import java.net._
 import java.nio.charset.StandardCharsets
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark._
+import org.apache.spark.{SparkException, _}
--- End diff --

`_` should include `SparkException` already


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211359028
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
+  val cmdString = readUtf8(sock)
+  if (cmdString.equals("run")) {
+sock.setSoTimeout(0)
+barrierAndServe(sock)
+  }
+} catch {
+  case _: SocketException =>
--- End diff --

Is the the timeout exception? I don't see any exception that we could 
silently ignore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211358743
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  sock.setSoTimeout(1)
--- End diff --

Should add a comment about this timeout.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211357983
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(0, 1, 
InetAddress.getByName("localhost")))
--- End diff --

minor: useful to add `/* port */` and `/* backlog */` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211182337
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,124 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, port, secret):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._port = port
+cls._secret = secret
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._port is None or self._secret is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+_load_from_socket(self._port, self._secret)
+
+def getTaskInfos(self):
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r210148558
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

Not yet. So I asked to hold it back for now since another gateway here 
looks the last choice, and was wondering if we can avoid to target 2.4.0. If 
this blocks, please go ahead. Will check it later on this weekends.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209974729
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

The major issue here is that we want to make the `barrier()` call blocking, 
the task shall wait until timeout or succeeded, do we have other ways to 
achieve this goal other than current approach here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209862546
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
--- End diff --

I'm okay as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209862336
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
--- End diff --

Ah, this is called in `_getOrCreate`. Sorry, I rushed to read. In this 
case, frankly I think we can remove this since that's the default constructor 
injected by Python or monkey patch to disallow the initialization (like we did 
for `ImageSchema`) but I guess we don't necessarily be super clever on this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209853276
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
--- End diff --

This just follows `TaskContext.__init__()`, shall we update both?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209846054
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, ctx):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._barrierContext = ctx
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+self._barrierContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call getTaskInfos() before 
initialize " +
+"BarrierTaskContext.")
+else:
+java_list = self._barrierContext.getTaskInfos()
+return [BarrierTaskInfo(h) for h in java_list]
+
+
+class BarrierTaskInfo(object):
+"""
+.. note:: Experimental
+
+Carries all task infos of a barrier task.
+
+.. versionadded:: 2.4.0
+"""
+
+def __init__(self, info):
+self.address = info.address
--- End diff --

* should be `info.address()`
* better to rename `info` to `jobj` to make it clear this is from Java


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209846015
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, ctx):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._barrierContext = ctx
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+self._barrierContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._barrierContext is None:
+raise Exception("Not supported to call getTaskInfos() before 
initialize " +
+"BarrierTaskContext.")
+else:
+java_list = self._barrierContext.getTaskInfos()
+return [BarrierTaskInfo(h) for h in java_list]
+
+
+class BarrierTaskInfo(object):
+"""
+.. note:: Experimental
+
+Carries all task infos of a barrier task.
+
+.. versionadded:: 2.4.0
+"""
+
+def __init__(self, info):
+self.address = info.address
--- End diff --

* should be `info.address`
* better to rename `info` to `jobj` to make it clear this is from Java


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209835246
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
--- End diff --

I would throw an exception BTW if this method should rather be banned


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209834646
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

BTW, this is a design change. We should probably update 
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals too .. is 
this really required to open a gate there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209833972
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

Mainly the reason is about resource usage, unusual access pattern via Py4J 
at worker side, and the possibility of allowing JVM access within Python worker.

It pretty much looks an overkill to launch a Java gateway to allow access 
to call a function assuming from 
https://github.com/apache/spark/pull/22085#discussion_r209490553. This pattern 
sounds pretty unusual - such cases, we usually send the data manually and read 
it in Python side, for instance `TaskContext`. Now, it opens a gateway for each 
worker if I am not mistaken.

I was thinking if we can avoid this. Can you elaborate why this is required 
and necessary? I haven't got enough time to look into this so was thinking 
about taking a look on this weekends.

This also now opens a possibility for an JVM access from worker side via 
`BarrierTaskContext`. For instance, I believe we can hack and access to JVM 
inside of UDFs.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-13 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209830941
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

@HyukjinKwon Could you elaborate your concerns? Is it because resource 
usage or security?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209491244
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

If this should necessarily target 2.4.0, don't block by me since it's a new 
feature and probably we could consider another approach later but if we can 
avoid, I would suggest to avoid for now. 

Let me try to track the design doc and changes about this. I think I need 
more time to check why it happened like this and if there's another way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209491060
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

Yea, I read and understood if this is only initialised when the context is 
a `BarrierTaskContext` but this is super weird we start another Java gateway 
here. If it's a hard requirement, then I suspect the design issue. Should this 
be targeted to 2.4.0, @mengxr?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209490553
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

We have to port `BarrierTaskContext` from java to python side, otherwise 
there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, 
of course, the JavaGateway is only initiated when the context is a 
`BarrierTaskContext`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209476191
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

Wait wait.. you guys sure have another Java gateway for each worker? (or 
did I rush to read this code?) Can you elaborate why this is needed? We should 
avoid this unless it's super required or necessary. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473946
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
--- End diff --

Leave a TODO here. We do not have requests from Java to Python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473919
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +96,33 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._javaContext is None:
+raise Exception("Not supported to call barrier() inside a 
non-barrier task.")
+else:
+self._javaContext.barrier()
+
+def getTaskInfos(self):
+"""
+.. note:: Experimental
+
+Returns the all task infos in this barrier stage, the task infos 
are ordered by
+partitionId.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._javaContext is None:
+raise Exception("Not supported to call getTaskInfos() inside a 
non-barrier task.")
+else:
+java_list = self._javaContext.getTaskInfos()
+return [h for h in java_list]
--- End diff --

Create `BarrierTaskInfo` class and wrap it over Java object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209473887
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +96,33 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+def barrier(self):
--- End diff --

Create `BarrierTaskContext` that extends `TaskContext` and then move those 
two methods there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209464591
  
--- Diff: python/pyspark/worker.py ---
@@ -275,6 +280,10 @@ def main(infile, outfile):
 shuffle.DiskBytesSpilled = 0
 _accumulatorRegistry.clear()
 
+if isBarrier:
+paras = GatewayParameters(port=boundPort, auth_token=secret, 
auto_convert=True)
--- End diff --

Maybe `params` instead of `paras`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209464569
  
--- Diff: python/pyspark/worker.py ---
@@ -261,6 +263,9 @@ def main(infile, outfile):
 
 # initialize global state
 taskContext = TaskContext._getOrCreate()
+isBarrier = read_bool(infile)
--- End diff --

Add a comment indicating the following 3 inputs are only for barrier task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209464679
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
+} else {
+  None
+}
+gatewayServer.map(_.start())
+gatewayServer.foreach { server =>
+  context.addTaskCompletionListener(_ => server.shutdown())
+}
+val boundPort: Int = 
gatewayServer.map(_.getListeningPort).getOrElse(0)
+if (boundPort == -1) {
+  val message = "GatewayServer to port BarrierTaskContext failed 
to bind to Java side."
+  logError(message)
+  throw new SparkException(message)
+} else {
+  logDebug(s"Started GatewayServer to port BarrierTaskContext on 
port $boundPort.")
+}
 // Write out the TaskContextInfo
--- End diff --

This comment should be moved too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209464515
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
+} else {
+  None
+}
+gatewayServer.map(_.start())
+gatewayServer.foreach { server =>
+  context.addTaskCompletionListener(_ => server.shutdown())
+}
+val boundPort: Int = 
gatewayServer.map(_.getListeningPort).getOrElse(0)
+if (boundPort == -1) {
+  val message = "GatewayServer to port BarrierTaskContext failed 
to bind to Java side."
+  logError(message)
+  throw new SparkException(message)
+} else {
+  logDebug(s"Started GatewayServer to port BarrierTaskContext on 
port $boundPort.")
--- End diff --

When `isBarrier` is false, I think we don't need show this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209464621
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -29,6 +29,7 @@ class TaskContext(object):
 """
 
 _taskContext = None
+_javaContext = None
--- End diff --

`_barrierContext`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22085

[SPARK-25095][PySpark] Python support for BarrierTaskContext

## What changes were proposed in this pull request?

Add method `barrier()` and `getTaskInfos()` in python TaskContext, these 
two methods are only allowed for barrier tasks.

## How was this patch tested?

TBD


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark python.barrier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22085


commit 7b488299709f715d344e5c38956577f31718ab34
Author: Xingbo Jiang 
Date:   2018-08-12T16:04:20Z

implement python barrier taskcontext




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org