[ 
https://issues.apache.org/jira/browse/SPARK-26019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719308#comment-16719308
 ] 

ASF GitHub Bot commented on SPARK-26019:
----------------------------------------

vanzin closed pull request #23113: [SPARK-26019][PYTHON] Fix race condition in 
accumulators.py: _start_update_server()
URL: https://github.com/apache/spark/pull/23113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 00ec094e7e3b4..5023fd3fd968c 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -271,8 +271,17 @@ def authenticate_and_accum_updates():
 class AccumulatorServer(SocketServer.TCPServer):
 
     def __init__(self, server_address, RequestHandlerClass, auth_token):
-        SocketServer.TCPServer.__init__(self, server_address, 
RequestHandlerClass)
         self.auth_token = auth_token
+        SocketServer.TCPServer.__init__(self, server_address, 
RequestHandlerClass, bind_and_activate=False)
+
+    def bind_and_serve(self):
+        try:
+            self.server_bind()
+            self.server_activate()
+            self.serve_forever()
+        except:
+            self.shutdown()
+            raise
 
     """
     A simple TCP server that intercepts shutdown() in order to interrupt
@@ -289,7 +298,7 @@ def shutdown(self):
 def _start_update_server(auth_token):
     """Start a TCP server to receive accumulator updates in a daemon thread, 
and returns it"""
     server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler, 
auth_token)
-    thread = threading.Thread(target=server.serve_forever)
+    thread = threading.Thread(target=server.bind_and_serve)
     thread.daemon = True
     thread.start()
     return server


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" 
> in authenticate_and_accum_updates()
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26019
>                 URL: https://issues.apache.org/jira/browse/SPARK-26019
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: Ruslan Dautkhanov
>            Priority: Major
>
> pyspark's accumulator server expects a secure py4j connection between python 
> and the jvm.  Spark will normally create a secure connection, but there is a 
> public api which allows you to pass in your own py4j connection.  (this is 
> used by zeppelin, at least.)  When this happens, you get an error like:
> {noformat}
> pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" 
> in authenticate_and_accum_updates()
> {noformat}
> We should change pyspark to
> 1) warn loudly if a user passes in an insecure connection
> 1a) I'd like to suggest that we even error out, unless the user actively 
> opts-in with a config like "spark.python.allowInsecurePy4j=true"
> 2) The accumulator server should be changed to allow insecure connections.
> note that SPARK-26349 will disallow insecure connections completely in 3.0.
>  
> More info on how this occurs:
> {code:python}
> Exception happened during processing of request from ('127.0.0.1', 43418)
> ----------------------------------------
> Traceback (most recent call last):
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line 
> 290, in _handle_request_noblock
>     self.process_request(request, client_address)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line 
> 318, in process_request
>     self.finish_request(request, client_address)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line 
> 331, in finish_request
>     self.RequestHandlerClass(request, client_address, self)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line 
> 652, in __init__
>     self.handle()
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
>  line 263, in handle
>     poll(authenticate_and_accum_updates)
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
>  line 238, in poll
>     if func():
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py",
>  line 251, in authenticate_and_accum_updates
>     received_token = self.rfile.read(len(auth_token))
> TypeError: object of type 'NoneType' has no len()
>  
> {code}
>  
> Error happens here:
> https://github.com/apache/spark/blob/cb90617f894fd51a092710271823ec7d1cd3a668/python/pyspark/accumulators.py#L254
> The PySpark code was just running a simple pipeline of 
> binary_rdd = sc.binaryRecords(full_file_path, record_length).map(lambda .. )
> and then converting it to a dataframe and running a count on it.
> It seems error is flaky - on next rerun it didn't happen. (But accumulators 
> don't actually work.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to