[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22247 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213469645 --- Diff: python/pyspark/java_gateway.py --- @@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(port, auth_secret): +""" +Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. +Handles IPV4 & IPV6, does some error handling. +:param port +:param auth_secret +:return: a tuple with (sockfile, sock) +""" +sock = None +errors = [] +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, _, sa = res +sock = socket.socket(af, socktype, proto) +try: +sock.settimeout(15) +sock.connect(sa) +except socket.error as e: +emsg = _exception_message(e) +errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg)) +sock.close() +sock = None +continue +break --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213400676 --- Diff: python/pyspark/java_gateway.py --- @@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(port, auth_secret): +""" +Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. +Handles IPV4 & IPV6, does some error handling. +:param port +:param auth_secret +:return: a tuple with (sockfile, sock) +""" +sock = None +errors = [] +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, _, sa = res +sock = socket.socket(af, socktype, proto) +try: +sock.settimeout(15) +sock.connect(sa) +except socket.error as e: +emsg = _exception_message(e) +errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg)) +sock.close() +sock = None +continue +break --- End diff -- Slight shorter (and more "python-compliant"?): - move the socket initialization (and the return) inside the try - get rid of the continue - use an else instead of the condition below --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213398017 --- Diff: python/pyspark/taskcontext.py --- @@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret): """ Load data from a given socket, this is a blocking method thus only return when the socket connection has been closed. - -This is copied from context.py, while modified the message protocol. """ -sock = None -# Support for both IPv4 and IPv6. -# On most of IPv6-ready systems, IPv6 will take precedence. -for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): -af, socktype, proto, canonname, sa = res -sock = socket.socket(af, socktype, proto) -try: -# Do not allow timeout for socket reading operation. -sock.settimeout(None) -sock.connect(sa) -except socket.error: -sock.close() -sock = None -continue -break -if not sock: -raise Exception("could not open socket") - -# We don't really need a socket file here, it's just for convenience that we can reuse the -# do_server_auth() function and data serialization methods. -sockfile = sock.makefile("rwb", 65536) - +(sockfile, sock) = local_connect_and_auth(port, auth_secret) --- End diff -- good catch, thanks! updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213378931 --- Diff: python/pyspark/taskcontext.py --- @@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret): """ Load data from a given socket, this is a blocking method thus only return when the socket connection has been closed. - -This is copied from context.py, while modified the message protocol. """ -sock = None -# Support for both IPv4 and IPv6. -# On most of IPv6-ready systems, IPv6 will take precedence. -for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): -af, socktype, proto, canonname, sa = res -sock = socket.socket(af, socktype, proto) -try: -# Do not allow timeout for socket reading operation. -sock.settimeout(None) -sock.connect(sa) -except socket.error: -sock.close() -sock = None -continue -break -if not sock: -raise Exception("could not open socket") - -# We don't really need a socket file here, it's just for convenience that we can reuse the -# do_server_auth() function and data serialization methods. -sockfile = sock.makefile("rwb", 65536) - +(sockfile, sock) = local_connect_and_auth(port, auth_secret) --- End diff -- We must set sock timeout to `None` to allow `barrier()` call blocking forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213324531 --- Diff: python/pyspark/java_gateway.py --- @@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(sock_info): --- End diff -- done, thanks -- I used the varargs version `*sock_info` for the last one, i forgot about that in python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213181568 --- Diff: python/pyspark/worker.py --- @@ -364,8 +364,5 @@ def process(): # Read information about how to connect back to the JVM from the environment. java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(("127.0.0.1", java_port)) -sock_file = sock.makefile("rwb", 65536) --- End diff -- I quickly tested and seems working fine. Please ignore this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213175296 --- Diff: python/pyspark/worker.py --- @@ -364,8 +364,5 @@ def process(): # Read information about how to connect back to the JVM from the environment. java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(("127.0.0.1", java_port)) -sock_file = sock.makefile("rwb", 65536) --- End diff -- @vanzin, BTW, did you test this on Windows too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213174542 --- Diff: python/pyspark/java_gateway.py --- @@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(sock_info): +""" +Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. +Handles IPV4 & IPV6, does some error handling. +:param sock_info: a tuple of (port, auth_secret) for connecting +:return: a tuple with (sockfile, sock) +""" +port, auth_secret = sock_info +sock = None +errors = [] +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res --- End diff -- nit: `af, socktype, proto, canonname, sa = res` -> `af, socktype, proto, _, sa = res` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213173845 --- Diff: python/pyspark/java_gateway.py --- @@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(sock_info): --- End diff -- @squito, not a big deal but how about `local_connect_and_auth (port, auth_secret)` and .. ```python (sockfile, sock) = local_connect_and_auth(port, auth_secret) ``` ```python (sock_file, _) = local_connect_and_auth(java_port, auth_secret) ``` ```python port, auth_secret = sock_info (sockfile, sock) = local_connect_and_auth(port, auth_secret) ``` or ```python (sockfile, sock) = local_connect_and_auth(*sock_info) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/22247 [SPARK-25253][PYSPARK] Refactor local connection & auth code This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark py_connection_refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22247.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22247 commit c232ec63f80eea05d3756feb22e53aa5a1e67d93 Author: Imran Rashid Date: 2018-08-27T17:07:44Z [SPARK-25253][PYSPARK] Refactor local connection & auth code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org