[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

2018-08-17 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r210963511
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,124 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, port, secret):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._port = port
+cls._secret = secret
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._port is None or self._secret is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+_load_from_socket(self._port, self._secret)
+
+def getTaskInfos(self):
--- End diff --

This is not available temporarily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

2018-08-17 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r210963181
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -381,6 +421,45 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
   }
 }
   }
+
+  /**
+   * Gateway to call BarrierTaskContext.barrier().
+   */
+  def barrierAndServe(): Unit = {
--- End diff --

It's not clear yet how to trigger this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org