[ https://issues.apache.org/jira/browse/SPARK-25992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687726#comment-16687726 ]
Abdeali Kothari commented on SPARK-25992: ----------------------------------------- [~hyukjin.kwon] I tried a fair bit and was unable to produce a reproducible minimal example. I posted it here hoping I could get some understanding when this could occur so I can simplify my code and figure out what is happening. I went through the pyspark code and as it occurs in _UpdateRequestHandler - which as I understand it is run only on the applicationMaster and not on workers. So, it looks like what was registered as an accumulator on the master is suddenly not seen in the master anymore. (i.e. the accumulator was lost from accumulatorRegistry) I am creating a single spark session and using it with celery - similar to what spark-celery tries to do https://github.com/gregbaker/spark-celery Here is the celery loader I use if that helps: {code:java} class AppLoader(AppLoader): def on_worker_init(self): # Initialize spark for the worker - once try: import findspark findspark.init() import pyspark except ImportError: import pyspark spark_builder = pyspark.sql.SparkSession.builder spark_builder = spark_builder.appName("MySpark") self.spark = spark_builder.getOrCreate() {code} I am using celery with a single worker: `--concurrency 1` I think (and may be mistaken) celery uses subprocess/multiprocessing internally to create a worker thread and runs jobs inside that worker thread. Are there any known issues with: Creating a spark session, Creating a subprocess, Submitting the job (which uses accumulators) in a subprocess. I can see that _accumulatorRegistry is a global module level variable in pyspark.accumulators - which means that module reimports can cause a problem (not sure if subprocess does a module reimport) Now that I think of it - _start_update_server is called during context creation. So, does that mean that the socket is created in masterThread and if the accumulator is created in my subprocess - the master will receive updates from the workers. So, my masterThread would throw an error that this accum is unknown because it was created in the subprocess ? Again: I do not know if celery uses subprocess or something else like asyncio/gevent ... This is pure speculation. > Accumulators giving KeyError in pyspark > --------------------------------------- > > Key: SPARK-25992 > URL: https://issues.apache.org/jira/browse/SPARK-25992 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.1 > Reporter: Abdeali Kothari > Priority: Major > > I am using accumulators and when I run my code, I sometimes get some warn > messages. When I checked, there was nothing accumulated - not sure if I lost > info from the accumulator or it worked and I can ignore this error ? > The message: > {noformat} > Exception happened during processing of request from > ('127.0.0.1', 62099) > Traceback (most recent call last): > File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 317, in > _handle_request_noblock > self.process_request(request, client_address) > File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 348, in > process_request > self.finish_request(request, client_address) > File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 361, in > finish_request > self.RequestHandlerClass(request, client_address, self) > File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 696, in > __init__ > self.handle() > File "/usr/local/hadoop/spark2.3.1/python/pyspark/accumulators.py", line 238, > in handle > _accumulatorRegistry[aid] += update > KeyError: 0 > ---------------------------------------- > 2018-11-09 19:09:08 ERROR DAGScheduler:91 - Failed to update accumulators for > task 0 > org.apache.spark.SparkException: EOF reached before Python server acknowledged > at > org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:634) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1131) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1123) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1123) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1206) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org