Repository: spark Updated Branches: refs/heads/branch-1.3 ac3591d09 -> 0ce83db11
[SPARK-7810] [PYSPARK] solve python rdd socket connection problem Method "_load_from_socket" in rdd.py cannot load data from jvm socket when ipv6 is used. The current method only works well with ipv4. New modification should work around both two protocols. Author: Ai He <ai...@ussuning.com> Author: AiHe <ai...@ussuning.com> Closes #6338 from AiHe/pyspark-networking-issue and squashes the following commits: d4fc9c4 [Ai He] handle code review 2 e75c5c8 [Ai He] handle code review 5644953 [AiHe] solve python rdd socket connection problem to jvm (cherry picked from commit ecd3aacf2805bb231cfb44bab079319cfe73c3f1) Signed-off-by: Davies Liu <dav...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce83db1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce83db1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce83db1 Branch: refs/heads/branch-1.3 Commit: 0ce83db11f5f91121705a5b72f134e9ffe72d2a6 Parents: ac3591d Author: Ai He <ai...@ussuning.com> Authored: Mon Jun 29 14:36:26 2015 -0700 Committer: Davies Liu <dav...@databricks.com> Committed: Mon Jun 29 14:37:54 2015 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0ce83db1/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bd18cb3..f27d7a6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -112,10 +112,22 @@ def _parse_memory(s): def _load_from_socket(port, serializer): - sock = socket.socket() - sock.settimeout(3) + sock = None + # Support for both IPv4 and IPv6. + # On most of IPv6-ready systems, IPv6 will take precedence. + for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + sock = socket.socket(af, socktype, proto) + sock.settimeout(3) + sock.connect(sa) + except socket.error: + sock = None + continue + break + if not sock: + raise Exception("could not open socket") try: - sock.connect(("localhost", port)) rf = sock.makefile("rb", 65536) for item in serializer.load_stream(rf): yield item --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org