[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22247


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213469645
  
--- Diff: python/pyspark/java_gateway.py ---
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(port, auth_secret):
+"""
+Connect to local host, authenticate with it, and return a 
(sockfile,sock) for that connection.
+Handles IPV4 & IPV6, does some error handling.
+:param port
+:param auth_secret
+:return: a tuple with (sockfile, sock)
+"""
+sock = None
+errors = []
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, _, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+sock.settimeout(15)
+sock.connect(sa)
+except socket.error as e:
+emsg = _exception_message(e)
+errors.append("tried to connect to %s, but an error occured: 
%s" % (sa, emsg))
+sock.close()
+sock = None
+continue
+break
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213400676
  
--- Diff: python/pyspark/java_gateway.py ---
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(port, auth_secret):
+"""
+Connect to local host, authenticate with it, and return a 
(sockfile,sock) for that connection.
+Handles IPV4 & IPV6, does some error handling.
+:param port
+:param auth_secret
+:return: a tuple with (sockfile, sock)
+"""
+sock = None
+errors = []
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, _, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+sock.settimeout(15)
+sock.connect(sa)
+except socket.error as e:
+emsg = _exception_message(e)
+errors.append("tried to connect to %s, but an error occured: 
%s" % (sa, emsg))
+sock.close()
+sock = None
+continue
+break
--- End diff --

Slight shorter (and more "python-compliant"?):

- move the socket initialization (and the return) inside the try
- get rid of the continue
- use an else instead of the condition below



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213398017
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret):
 """
 Load data from a given socket, this is a blocking method thus only 
return when the socket
 connection has been closed.
-
-This is copied from context.py, while modified the message protocol.
 """
-sock = None
-# Support for both IPv4 and IPv6.
-# On most of IPv6-ready systems, IPv6 will take precedence.
-for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
-af, socktype, proto, canonname, sa = res
-sock = socket.socket(af, socktype, proto)
-try:
-# Do not allow timeout for socket reading operation.
-sock.settimeout(None)
-sock.connect(sa)
-except socket.error:
-sock.close()
-sock = None
-continue
-break
-if not sock:
-raise Exception("could not open socket")
-
-# We don't really need a socket file here, it's just for convenience 
that we can reuse the
-# do_server_auth() function and data serialization methods.
-sockfile = sock.makefile("rwb", 65536)
-
+(sockfile, sock) = local_connect_and_auth(port, auth_secret)
--- End diff --

good catch, thanks!  updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213378931
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret):
 """
 Load data from a given socket, this is a blocking method thus only 
return when the socket
 connection has been closed.
-
-This is copied from context.py, while modified the message protocol.
 """
-sock = None
-# Support for both IPv4 and IPv6.
-# On most of IPv6-ready systems, IPv6 will take precedence.
-for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
-af, socktype, proto, canonname, sa = res
-sock = socket.socket(af, socktype, proto)
-try:
-# Do not allow timeout for socket reading operation.
-sock.settimeout(None)
-sock.connect(sa)
-except socket.error:
-sock.close()
-sock = None
-continue
-break
-if not sock:
-raise Exception("could not open socket")
-
-# We don't really need a socket file here, it's just for convenience 
that we can reuse the
-# do_server_auth() function and data serialization methods.
-sockfile = sock.makefile("rwb", 65536)
-
+(sockfile, sock) = local_connect_and_auth(port, auth_secret)
--- End diff --

We must set sock timeout to `None` to allow `barrier()` call blocking 
forever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213324531
  
--- Diff: python/pyspark/java_gateway.py ---
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(sock_info):
--- End diff --

done, thanks -- I used the varargs version `*sock_info` for the last one, i 
forgot about that in python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213181568
  
--- Diff: python/pyspark/worker.py ---
@@ -364,8 +364,5 @@ def process():
 # Read information about how to connect back to the JVM from the 
environment.
 java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
 auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
-sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-sock.connect(("127.0.0.1", java_port))
-sock_file = sock.makefile("rwb", 65536)
--- End diff --

I quickly tested and seems working fine. Please ignore this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213175296
  
--- Diff: python/pyspark/worker.py ---
@@ -364,8 +364,5 @@ def process():
 # Read information about how to connect back to the JVM from the 
environment.
 java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
 auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
-sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-sock.connect(("127.0.0.1", java_port))
-sock_file = sock.makefile("rwb", 65536)
--- End diff --

@vanzin, BTW, did you test this on Windows too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213174542
  
--- Diff: python/pyspark/java_gateway.py ---
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(sock_info):
+"""
+Connect to local host, authenticate with it, and return a 
(sockfile,sock) for that connection.
+Handles IPV4 & IPV6, does some error handling.
+:param sock_info: a tuple of (port, auth_secret) for connecting
+:return: a tuple with (sockfile, sock)
+"""
+port, auth_secret = sock_info
+sock = None
+errors = []
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
--- End diff --

nit: `af, socktype, proto, canonname, sa = res` -> `af, socktype, proto, _, 
sa = res`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213173845
  
--- Diff: python/pyspark/java_gateway.py ---
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(sock_info):
--- End diff --

@squito, not a big deal but how about `local_connect_and_auth (port, 
auth_secret)` and ..

```python
(sockfile, sock) = local_connect_and_auth(port, auth_secret)
```

```python
(sock_file, _) = local_connect_and_auth(java_port, auth_secret)
```

```python
port, auth_secret = sock_info
(sockfile, sock) = local_connect_and_auth(port, auth_secret)
```

or 

```python
(sockfile, sock) = local_connect_and_auth(*sock_info)
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-27 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/22247

[SPARK-25253][PYSPARK] Refactor local connection & auth code

This eliminates some duplication in the code to connect to a server on 
localhost to talk directly to the jvm.  Also it gives consistent ipv6 and error 
handling.  Two other incidental changes, that shouldn't matter:
1) python barrier tasks perform authentication immediately (rather than 
waiting for the BARRIER_FUNCTION indicator)
2) for `rdd._load_from_socket`, the timeout is only increased after 
authentication.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark py_connection_refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22247.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22247


commit c232ec63f80eea05d3756feb22e53aa5a1e67d93
Author: Imran Rashid 
Date:   2018-08-27T17:07:44Z

[SPARK-25253][PYSPARK] Refactor local connection & auth code




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org