[ 
https://issues.apache.org/jira/browse/SPARK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Morten Hornbech updated SPARK-22438:
------------------------------------
    Description: 
We have a customer that uses Spark as an engine for running SQL on a collection 
of small datasets, typically no greater than a few thousand rows. Recently we 
started observing out-of-memory errors on some new workloads. Even though the 
datasets were only a few kilobytes, the job would almost immediately spike to > 
10GB of memory usage, producing an out-of-memory error on the modest hardware 
(2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory 
to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable 
high memory usage.

The query involved was a left join on two datasets. In some, but not all, cases 
we were able to remove or reduce the problem by rewriting the query to use an 
exists sub-select instead. After a lot of debugging we were able to reproduce 
the problem locally with the following test:

{code:java}
case class Data(value: String)

val session = SparkSession.builder.master("local[1]").getOrCreate()
import session.implicits._

val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))

foo.persist(StorageLevel.MEMORY_ONLY)

foo.createTempView("foo")
bar.createTempView("bar")

val result = session.sql("select * from bar left join foo on bar.value = 
foo.value")
result.coalesce(2).collect()
{code}

Running this produces the error below:

{code:java}
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
   at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
   at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
   at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
{code}

The exact failure point varies with the number of threads given to spark, the 
"coalesce" value and the number of rows in "foo". Using an inner join, removing 
the call to persist, removing the call to coalease (or using repartition) will 
all independently make the error go away.

The reason persist and coalesce are used in the workload at all is because it 
is part of a more general Spark-based processing engine, not limited to these 
small datasets. Therefore the workaround is not a simple as it may seem, since 
we cannot tailor the Spark code to this specific case.

  was:
We have a customer that uses Spark as an engine for running SQL on a collection 
of small datasets, typically no greater than a few thousand rows. Recently we 
started observing out-of-memory errors on some new workloads. Even though the 
datasets were only a few kilobytes, the job would almost immediately spike to > 
10GB of memory usage, producing an out-of-memory error on the modest hardware 
(2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory 
to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable 
high memory usage.

The query involved was a left join on two datasets. In some, but not all, cases 
we were able to remove or reduce the problem by rewriting the query to use an 
exists sub-select instead. After a lot of debugging we were able to reproduce 
the problem locally with the following test:

{{
case class Data(value: String)

val session = SparkSession.builder.master("local[1]").getOrCreate()
import session.implicits._

val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))

foo.persist(StorageLevel.MEMORY_ONLY)

foo.createTempView("foo")
bar.createTempView("bar")

val result = session.sql("select * from bar left join foo on bar.value = 
foo.value")
result.coalesce(2).collect()
}}

Running this produces the error below:

{{
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
   at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
   at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
   at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
}}

The exact failure point varies with the number of threads given to spark, the 
"coalesce" value and the number of rows in "foo". Using an inner join, removing 
the call to persist, removing the call to coalease (or using repartition) will 
all independently make the error go away.

The reason persist and coalesce are used in the workload at all is because it 
is part of a more general Spark-based processing engine, not limited to these 
small datasets. Therefore the workaround is not a simple as it may seem, since 
we cannot tailor the Spark code to this specific case.


> OutOfMemoryError on very small data sets
> ----------------------------------------
>
>                 Key: SPARK-22438
>                 URL: https://issues.apache.org/jira/browse/SPARK-22438
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Morten Hornbech
>            Priority: Critical
>
> We have a customer that uses Spark as an engine for running SQL on a 
> collection of small datasets, typically no greater than a few thousand rows. 
> Recently we started observing out-of-memory errors on some new workloads. 
> Even though the datasets were only a few kilobytes, the job would almost 
> immediately spike to > 10GB of memory usage, producing an out-of-memory error 
> on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware 
> and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, 
> but still with an unreasonable high memory usage.
> The query involved was a left join on two datasets. In some, but not all, 
> cases we were able to remove or reduce the problem by rewriting the query to 
> use an exists sub-select instead. After a lot of debugging we were able to 
> reproduce the problem locally with the following test:
> {code:java}
> case class Data(value: String)
> val session = SparkSession.builder.master("local[1]").getOrCreate()
> import session.implicits._
> val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
> val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))
> foo.persist(StorageLevel.MEMORY_ONLY)
> foo.createTempView("foo")
> bar.createTempView("bar")
> val result = session.sql("select * from bar left join foo on bar.value = 
> foo.value")
> result.coalesce(2).collect()
> {code}
> Running this produces the error below:
> {code:java}
> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>    at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
>    at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
>    at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
>    at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>    at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>    at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>    at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>    at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>    at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>    at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
>    at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649)
>    at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
>    at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
>    at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>    at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>    at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>    at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>    at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>    at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>    at org.apache.spark.scheduler.Task.run(Task.scala:108)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> {code}
> The exact failure point varies with the number of threads given to spark, the 
> "coalesce" value and the number of rows in "foo". Using an inner join, 
> removing the call to persist, removing the call to coalease (or using 
> repartition) will all independently make the error go away.
> The reason persist and coalesce are used in the workload at all is because it 
> is part of a more general Spark-based processing engine, not limited to these 
> small datasets. Therefore the workaround is not a simple as it may seem, 
> since we cannot tailor the Spark code to this specific case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to