[ 
https://issues.apache.org/jira/browse/SPARK-25992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687726#comment-16687726
 ] 

Abdeali Kothari commented on SPARK-25992:
-----------------------------------------

[~hyukjin.kwon] I tried a fair bit and was unable to produce a reproducible 
minimal example.
I posted it here hoping I could get some understanding when this could occur so 
I can simplify my code and figure out what is happening.

I went through the pyspark code and as it occurs in _UpdateRequestHandler - 
which as I understand it is run only on the applicationMaster and not on 
workers. So, it looks like what was registered as an accumulator on the master 
is suddenly not seen in the master anymore. (i.e. the accumulator was lost from 
accumulatorRegistry)

I am creating a single spark session and using it with celery - similar to what 
spark-celery tries to do https://github.com/gregbaker/spark-celery

Here is the celery loader I use if that helps:
{code:java}
class AppLoader(AppLoader):
     def on_worker_init(self):  # Initialize spark for the worker - once
        try:
            import findspark
            findspark.init()
            import pyspark
        except ImportError:
            import pyspark

        spark_builder = pyspark.sql.SparkSession.builder
        spark_builder = spark_builder.appName("MySpark")
        self.spark = spark_builder.getOrCreate()
{code}

I am using celery with a single worker: `--concurrency 1`

I think (and may be mistaken) celery uses subprocess/multiprocessing internally 
to create a worker thread and runs jobs inside that worker thread. Are there 
any known issues with: Creating a spark session, Creating a subprocess, 
Submitting the job (which uses accumulators) in a subprocess.

I can see that _accumulatorRegistry is a global module level variable in 
pyspark.accumulators - which means that module reimports can cause a problem 
(not sure if subprocess does a module reimport)

Now that I think of it - _start_update_server is called during context 
creation. So, does that mean that the socket is created in masterThread and if 
the accumulator is created in my subprocess - the master will receive updates 
from the workers. So, my masterThread would throw an error that this accum is 
unknown because it was created in the subprocess ?


Again: I do not know if celery uses subprocess or something else like 
asyncio/gevent ... This is pure speculation.

> Accumulators giving KeyError in pyspark
> ---------------------------------------
>
>                 Key: SPARK-25992
>                 URL: https://issues.apache.org/jira/browse/SPARK-25992
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> I am using accumulators and when I run my code, I sometimes get some warn 
> messages. When I checked, there was nothing accumulated - not sure if I lost 
> info from the accumulator or it worked and I can ignore this error ?
> The message:
> {noformat}
> Exception happened during processing of request from
> ('127.0.0.1', 62099)
> Traceback (most recent call last):
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 317, in 
> _handle_request_noblock
>     self.process_request(request, client_address)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 348, in 
> process_request
>     self.finish_request(request, client_address)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 361, in 
> finish_request
>     self.RequestHandlerClass(request, client_address, self)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 696, in 
> __init__
>     self.handle()
> File "/usr/local/hadoop/spark2.3.1/python/pyspark/accumulators.py", line 238, 
> in handle
>     _accumulatorRegistry[aid] += update
> KeyError: 0
> ----------------------------------------
> 2018-11-09 19:09:08 ERROR DAGScheduler:91 - Failed to update accumulators for 
> task 0
> org.apache.spark.SparkException: EOF reached before Python server acknowledged
>       at 
> org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:634)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1131)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1123)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1123)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1206)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to