Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r210963511 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,124 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): + """ + Load data from a given socket, this is a blocking method thus only return when the socket + connection has been closed. + """ + sock = None + # Support for both IPv4 and IPv6. + # On most of IPv6-ready systems, IPv6 will take precedence. + for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + sock = socket.socket(af, socktype, proto) + try: + # Do not allow timeout for socket reading operation. + sock.settimeout(None) + sock.connect(sa) + except socket.error: + sock.close() + sock = None + continue + break + if not sock: + raise Exception("could not open socket") + + sockfile = sock.makefile("rwb", 65536) + do_server_auth(sockfile, auth_secret) + + # The socket will be automatically closed when garbage-collected. + return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + + """ + .. note:: Experimental + + A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext + for a running task, use: + L{BarrierTaskContext.get()}. + + .. versionadded:: 2.4.0 + """ + + _port = None + _secret = None + + def __init__(self): + """Construct a BarrierTaskContext, use get instead""" + pass + + @classmethod + def _getOrCreate(cls): + """Internal function to get or create global BarrierTaskContext.""" + if cls._taskContext is None: + cls._taskContext = BarrierTaskContext() + return cls._taskContext + + @classmethod + def get(cls): + """ + Return the currently active BarrierTaskContext. This can be called inside of user functions + to access contextual information about running tasks. + + .. note:: Must be called on the worker, not the driver. Returns None if not initialized. + """ + return cls._taskContext + + @classmethod + def _initialize(cls, port, secret): + """ + Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called + after BarrierTaskContext is initialized. + """ + cls._port = port + cls._secret = secret + + def barrier(self): + """ + .. note:: Experimental + + Sets a global barrier and waits until all tasks in this stage hit this barrier. + Note this method is only allowed for a BarrierTaskContext. + + .. versionadded:: 2.4.0 + """ + if self._port is None or self._secret is None: + raise Exception("Not supported to call barrier() before initialize " + + "BarrierTaskContext.") + else: + _load_from_socket(self._port, self._secret) + + def getTaskInfos(self): --- End diff -- This is not available temporarily.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org