A JVM can easily be limited in how much memory it uses with the -Xmx
parameter, but Python doesn't have memory limits built in in such a
first-class way.  Maybe the memory limits aren't making it to the python
executors.

What was your SPARK_MEM setting?  The JVM below seems to be using 603201
(pages?) and the 3 large python processes each are using ~1800000 (pages?).
 I'm unsure the units that the OOM killer's RSS column is in.  Could be
either pages (4kb each) or bytes.


Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  2348    12573
    2102      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  2349    12573
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  2350    12573
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  5115    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  5116    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  5117    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  7725    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  7726    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  7727    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 10324    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 10325    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 10326    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
   47932     190        0             0 java
Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 13295    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 13296    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 13297    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 15192    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 15193    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 15195    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
 1818463    3573        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
 1686492    3316        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
 1762344    3463        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
15198 (python) score 221 or sacrifice child
Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198
(python) total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB


On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller <bmill...@eecs.berkeley.edu>wrote:

> Hi All,
>
> I poked around a bit more to (1) confirm my suspicions that the crash
> was related to memory consumption and (2) figure out why there is no
> error shown in 12_stderr, the spark executor log file from the
> executors on bennett.research.intel.research.net.
>
> The syslog file (from /var/log/syslog on bennett, attached) shows that
> the machine ran out of memory, the memory was mostly consumed by 1
> java process and 3 python processes (I am running pyspark with 3 cores
> per machine), and then the kernel began killing java and python
> processes to ease memory pressure.  It seems likely that these
> processes were the spark processes, and there's no errors recorded in
> 12_stderr because the process was killed by the OS (rather than
> experiencing an unhandled "cannot allocate memory" exception).
>
> I'm a little confused how Spark could consume so much memory during
> the reduce phase of the shuffle.  Shouldn't Spark remain within the
> SPARK_MEM limitations on memory consumption, and spill to disk in the
> event that there isn't enough memory?
>
> -Brad
>
>
> On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller <bmill...@eecs.berkeley.edu>
> wrote:
> > Hi Patrick,
> >
> >> The shuffle data is written through the buffer cache of the operating
> >> system, so you would expect the files to show up there immediately and
> >> probably to show up as being their full size when you do "ls". In
> reality
> >> though these are likely residing in the OS cache and not on disk.
> >
> > I see.  Perhaps the memory consumption is related to this?
> >
> >> Could you paste the error here?
> >
> > While I have definitely seen "cannot allocate memory" errors while
> > trying to do this, I am unable to reproduce one now.  Instead, I am
> > able to produce "most recent failure: unknown" (see full error
> > displayed in my iPython session below).  Initially, I assumed there
> > was some sort of non-determinism which caused the error to
> > occasionally be "unknown", but now I realize that it may have been a
> > consistent change which occurred when I updated to the latest
> > brach-0.9 (previously I was running a version I pulled around March
> > 10th).
> >
> > Py4JJavaError: An error occurred while calling o274.collect.
> > : org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4
> > times (most recent failure: unknown)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> >     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> >     at scala.Option.foreach(Option.scala:236)
> >     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >> From the logs it looks like your executor has died. Would you be able to
> >> paste the log from the executor with the exact failure? It would show
> up in
> >> the /work directory inside of spark's directory on the cluster.
> >
> > I've attached the logging output from the driver from when I re-ran
> > the join operation this morning.  It seems that specific, individual
> > workers (or perhaps executors is the right term?) begin to die and
> > then are re-launched by the master/driver.  When I examine the
> > app-xxx...xxx folder corresponding to this job on the first worker to
> > fail (bennett.research.intel-research.net), there are several numbered
> > folders inside (12, 15, 21, 22, 23) which seem to correspond to each
> > invocation of the executor as recorded in the driver log.  stdout is
> > consistently empty, and stderr is not.  I have attached all of these
> > logs as <executor_id>_stderr.
> >
> > Surprisingly, 12_stderr does not record any sort of error, although
> > 15_stderr, 21_stderr, and 22_stderr do.  These errors are all of the
> > form:
> >
> > 14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread
> > Thread[stdin writer for python,5,main]
> > org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316)
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314)
> >     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >     at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> >     at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313)
> >     at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139)
> >     at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
> >     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> > Caused by: java.lang.Exception: Missing an output location for shuffle 0
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317)
> >     ... 17 more
> >
> > Note that in "Fetch failed: null 0 -1 44" the last numeral varies in
> > each of the errors.  I have also attached an executor stderr trace
> > from malkin.research.intel-research.net, which one of several
> > executors to crash after bennett.  Notice that this trace does contain
> > errors, although they seem to be saying that there's trouble
> > connecting to bennett.
> >
> > If anyone would like additional information, please let me know.
> >
> > best,
> > -Brad
> >
> > On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pwend...@gmail.com>
> wrote:
> >>
> >>
> >>
> >> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller <bmill...@eecs.berkeley.edu
> >
> >> wrote:
> >>>
> >>> I am running the latest version of PySpark branch-0.9 and having some
> >>> trouble with join.
> >>>
> >>> One RDD is about 100G (25GB compressed and serialized in memory) with
> >>> 130K records, the other RDD is about 10G (2.5G compressed and
> >>> serialized in memory) with 330K records.  I load each RDD from HDFS,
> >>> invoke keyBy to key each record, and then attempt to join the RDDs.
> >>>
> >>> The join consistently crashes at the beginning of the reduce phase.
> >>> Note that when joining the 10G RDD to itself there is no problem.
> >>>
> >>> Prior to the crash, several suspicious things happen:
> >>>
> >>> -All map output files from the map phase of the join are written to
> >>> spark.local.dir, even though there should be plenty of memory left to
> >>> contain the map output.  I am reasonably sure *all* map outputs are
> >>> written to disk because the size of the map output spill directory
> >>> matches the size of the shuffle write (as displayed in the user
> >>> interface) for each machine.
> >>
> >>
> >> The shuffle data is written through the buffer cache of the operating
> >> system, so you would expect the files to show up there immediately and
> >> probably to show up as being their full size when you do "ls". In
> reality
> >> though these are likely residing in the OS cache and not on disk.
> >>
> >>>
> >>> -In the beginning of the reduce phase of the join, memory consumption
> >>> on each work spikes and each machine runs out of memory (as evidenced
> >>> by a "Cannon allocate memory" exception in Java).  This is
> >>> particularly surprising since each machine has 30G of ram and each
> >>> spark worker has only been allowed 10G.
> >>
> >>
> >> Could you paste the error here?
> >>
> >>>
> >>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
> >>> (Disk)" fields for each machine remain at 0.0 despite shuffle files
> >>> being written into spark.local.dir.
> >>
> >>
> >> Shuffle spill is different than the shuffle files written to
> >> spark.local.dir. Shuffle spilling is for aggregations that occur on the
> >> reduce side of the shuffle. A join like this might not see any spilling.
> >>
> >>
> >>
> >> From the logs it looks like your executor has died. Would you be able to
> >> paste the log from the executor with the exact failure? It would show
> up in
> >> the /work directory inside of spark's directory on the cluster.
>

Reply via email to