Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ac3591d09 -> 0ce83db11


[SPARK-7810] [PYSPARK] solve python rdd socket connection problem

Method "_load_from_socket" in rdd.py cannot load data from jvm socket when ipv6 
is used. The current method only works well with ipv4. New modification should 
work around both two protocols.

Author: Ai He <ai...@ussuning.com>
Author: AiHe <ai...@ussuning.com>

Closes #6338 from AiHe/pyspark-networking-issue and squashes the following 
commits:

d4fc9c4 [Ai He] handle code review 2
e75c5c8 [Ai He] handle code review
5644953 [AiHe] solve python rdd socket connection problem to jvm

(cherry picked from commit ecd3aacf2805bb231cfb44bab079319cfe73c3f1)
Signed-off-by: Davies Liu <dav...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce83db1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce83db1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce83db1

Branch: refs/heads/branch-1.3
Commit: 0ce83db11f5f91121705a5b72f134e9ffe72d2a6
Parents: ac3591d
Author: Ai He <ai...@ussuning.com>
Authored: Mon Jun 29 14:36:26 2015 -0700
Committer: Davies Liu <dav...@databricks.com>
Committed: Mon Jun 29 14:37:54 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ce83db1/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bd18cb3..f27d7a6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -112,10 +112,22 @@ def _parse_memory(s):
 
 
 def _load_from_socket(port, serializer):
-    sock = socket.socket()
-    sock.settimeout(3)
+    sock = None
+    # Support for both IPv4 and IPv6.
+    # On most of IPv6-ready systems, IPv6 will take precedence.
+    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+        af, socktype, proto, canonname, sa = res
+        try:
+            sock = socket.socket(af, socktype, proto)
+            sock.settimeout(3)
+            sock.connect(sa)
+        except socket.error:
+            sock = None
+            continue
+        break
+    if not sock:
+        raise Exception("could not open socket")
     try:
-        sock.connect(("localhost", port))
         rf = sock.makefile("rb", 65536)
         for item in serializer.load_stream(rf):
             yield item


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to