This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 55645156c4 [GLUTEN-11169][VL] Fix OOM error not thrown when global
off-heap memory is running out by off-heap BHJ (#11208)
55645156c4 is described below
commit 55645156c4cd8f779aad7aba6fae9c3f1bb7bd78
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Nov 28 16:01:00 2025 +0000
[GLUTEN-11169][VL] Fix OOM error not thrown when global off-heap memory is
running out by off-heap BHJ (#11208)
---
.../backendsapi/velox/VeloxListenerApi.scala | 6 +-
.../spark/memory/GlobalOffHeapMemorySuite.scala | 10 +--
.../UnsafeColumnarBuildSideRelationTest.scala | 73 +++++++++++++++++---
.../gluten/memory/memtarget/MemoryTargets.java | 5 ++
.../memory/memtarget/ThrowOnOomMemoryTarget.java | 79 +++++++++++-----------
.../apache/spark/memory/GlobalOffHeapMemory.scala | 11 ++-
.../spark/memory/GlobalOffHeapMemoryTarget.scala | 6 ++
7 files changed, 125 insertions(+), 65 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 3488ef5653..585f6d736d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.init.NativeBackendInitializer
import org.apache.gluten.jni.{JniLibLoader, JniWorkspace}
import org.apache.gluten.memory.{MemoryUsageRecorder,
SimpleMemoryUsageRecorder}
import org.apache.gluten.memory.listener.ReservationListener
+import org.apache.gluten.memory.memtarget.MemoryTarget
import org.apache.gluten.monitor.VeloxMemoryProfiler
import org.apache.gluten.udf.UdfJniWrapper
import org.apache.gluten.utils._
@@ -276,15 +277,16 @@ object VeloxListenerApi {
private def newGlobalOffHeapMemoryListener(): ReservationListener = {
new ReservationListener {
private val recorder: MemoryUsageRecorder = new
SimpleMemoryUsageRecorder()
+ private val target: MemoryTarget = GlobalOffHeapMemory.target
override def reserve(size: Long): Long = {
- GlobalOffHeapMemory.acquire(size)
+ assert(target.borrow(size) == size)
recorder.inc(size)
size
}
override def unreserve(size: Long): Long = {
- GlobalOffHeapMemory.release(size)
+ assert(target.repay(size) == size)
recorder.inc(-size)
size
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 084386fcd7..95053e3e65 100644
---
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.memory
import org.apache.gluten.config.GlutenCoreConfig
-import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
+import
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
import org.apache.spark.TaskContext
@@ -54,11 +54,11 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
assert(consumer.borrow(300) == 300)
GlobalOffHeapMemory.acquire(50)
GlobalOffHeapMemory.acquire(40)
- assertThrows[GlutenException](GlobalOffHeapMemory.acquire(30))
- assertThrows[GlutenException](GlobalOffHeapMemory.acquire(11))
+ assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(30))
+ assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(11))
GlobalOffHeapMemory.acquire(10)
GlobalOffHeapMemory.acquire(0)
- assertThrows[GlutenException](GlobalOffHeapMemory.acquire(1))
+ assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(1))
}
}
@@ -113,7 +113,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
Spillers.NOOP,
Collections.emptyMap())
assert(consumer.borrow(300) == 300)
- assertThrows[GlutenException](GlobalOffHeapMemory.acquire(200))
+ assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(200))
assert(consumer.repay(100) == 100)
GlobalOffHeapMemory.acquire(200)
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
index 6d0448fd84..252bf451f6 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.execution.unsafe
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.memory.GlobalOffHeapMemory
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
import org.apache.spark.sql.test.SharedSparkSession
@@ -29,6 +32,9 @@ import org.apache.spark.unsafe.Platform
import java.util
+import scala.collection.mutable
+import scala.util.Random
+
class UnsafeColumnarBuildSideRelationTest extends SharedSparkSession {
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -38,12 +44,18 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
var unsafeRelWithIdentityMode: UnsafeColumnarBuildSideRelation = _
var unsafeRelWithHashMode: UnsafeColumnarBuildSideRelation = _
+ var output: Seq[Attribute] = _
var sampleBytes: Array[Array[Byte]] = _
+ var initialGlobalBytes: Long = _
private def toUnsafeByteArray(bytes: Array[Byte]): UnsafeByteArray = {
val buf = ArrowBufferAllocators.globalInstance().buffer(bytes.length)
- buf.setBytes(0, bytes, 0, bytes.length);
- new UnsafeByteArray(buf, bytes.length.toLong)
+ buf.setBytes(0, bytes, 0, bytes.length)
+ try {
+ new UnsafeByteArray(buf, bytes.length.toLong)
+ } finally {
+ buf.close()
+ }
}
private def toByteArray(unsafeByteArray: UnsafeByteArray): Array[Byte] = {
@@ -59,17 +71,48 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
override def beforeAll(): Unit = {
super.beforeAll()
- val a = AttributeReference("a", StringType, nullable = false, null)()
- val output = Seq(a)
- sampleBytes = Array("12345".getBytes(), "7890".getBytes)
- unsafeRelWithIdentityMode = UnsafeColumnarBuildSideRelation(
+ initialGlobalBytes = GlobalOffHeapMemory.currentBytes()
+ output = Seq(AttributeReference("a", StringType, nullable = false, null)())
+ sampleBytes = Array(randomBytes(10), randomBytes(100))
+ unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(sampleBytes:
_*)
+ unsafeRelWithHashMode = newUnsafeRelationWithHashMode(sampleBytes: _*)
+ }
+
+ override protected def afterAll(): Unit = {
+ // Makes sure all the underlying UnsafeByteArray instances become GC
non-reachable and
+ // be released after a full-GC.
+ unsafeRelWithIdentityMode = null
+ unsafeRelWithHashMode = null
+ System.gc()
+ Thread.sleep(500)
+ // FIXME: This should be zero. We had to assert with the initial bytes
because
+ // there were some allocations from the previous run suites.
+ assert(GlobalOffHeapMemory.currentBytes() == initialGlobalBytes)
+ }
+
+ private def randomBytes(size: Int): Array[Byte] = {
+ val array = new Array[Byte](size)
+ val random = new Random()
+ random.nextBytes(array)
+ array
+ }
+
+ private def newUnsafeRelationWithIdentityMode(
+ bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
+ require(bytes.nonEmpty)
+ UnsafeColumnarBuildSideRelation(
output,
- sampleBytes.map(a => toUnsafeByteArray(a)),
+ bytes.map(a => toUnsafeByteArray(a)),
IdentityBroadcastMode
)
- unsafeRelWithHashMode = UnsafeColumnarBuildSideRelation(
+ }
+
+ private def newUnsafeRelationWithHashMode(
+ bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
+ require(bytes.nonEmpty)
+ UnsafeColumnarBuildSideRelation(
output,
- sampleBytes.map(a => toUnsafeByteArray(a)),
+ bytes.map(a => toUnsafeByteArray(a)),
HashedRelationBroadcastMode(output, isNullAware = false)
)
}
@@ -124,4 +167,14 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
sampleBytes.asInstanceOf[Array[AnyRef]]))
}
+ test("Should throw OOM when off-heap memory is running out") {
+ // 500 MiB > 200 MiB so OOM should be thrown.
+ val relations = mutable.ListBuffer[UnsafeColumnarBuildSideRelation]()
+ assertThrows[OutOfMemoryException] {
+ for (i <- 0 until 10) {
+ relations +=
newUnsafeRelationWithHashMode(randomBytes(ByteUnit.MiB.toBytes(50).toInt))
+ }
+ }
+ relations.clear()
+ }
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index e6cab4b6a8..2ad47f2890 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -22,6 +22,7 @@ import
org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
import org.apache.spark.SparkEnv;
import org.apache.spark.annotation.Experimental;
+import org.apache.spark.memory.GlobalOffHeapMemoryTarget;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.SparkResourceUtil;
@@ -37,6 +38,10 @@ public final class MemoryTargets {
// enclose factory ctor
}
+ public static MemoryTarget global() {
+ return new GlobalOffHeapMemoryTarget();
+ }
+
public static MemoryTarget throwOnOom(MemoryTarget target) {
return new ThrowOnOomMemoryTarget(target);
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index 8ebedd5418..5a2b10e279 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -23,7 +23,22 @@ import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.task.TaskResources;
import org.apache.spark.util.Utils;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+
public class ThrowOnOomMemoryTarget implements MemoryTarget {
+ private static final List<String> PRINTED_NON_BYTES_CONFIGURATIONS =
+ Arrays.asList(
+ GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
+ GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED().key());
+
+ private static final List<String> PRINTED_BYTES_CONFIGURATIONS =
+ Arrays.asList(
+ GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
+ GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key());
+
private final MemoryTarget target;
public ThrowOnOomMemoryTarget(MemoryTarget target) {
@@ -46,8 +61,8 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget {
if (TaskResources.inSparkTask()) {
TaskResources.getLocalTaskContext().taskMemoryManager().showMemoryUsage();
}
- // Build error message, then throw
- StringBuilder errorBuilder = new StringBuilder();
+ // Build error message, then throw.
+ final StringBuilder errorBuilder = new StringBuilder();
errorBuilder
.append(
String.format(
@@ -58,52 +73,34 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget
{
+ "is not enabled). %n",
Utils.bytesToString(size), Utils.bytesToString(granted)))
.append("Current config settings: ")
- .append(System.lineSeparator())
- .append(
- String.format(
- "\t%s=%s",
- GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
- reformatBytes(
- SQLConf.get()
-
.getConfString(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
- .append(System.lineSeparator())
- .append(
- String.format(
- "\t%s=%s",
- GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
- reformatBytes(
- SQLConf.get()
- .getConfString(
-
GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key()))))
- .append(System.lineSeparator())
- .append(
- String.format(
- "\t%s=%s",
-
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
- reformatBytes(
- SQLConf.get()
- .getConfString(
-
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES()
- .key()))))
- .append(System.lineSeparator())
- .append(
- String.format(
- "\t%s=%s",
- GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
-
SQLConf.get().getConfString(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY())))
- .append(System.lineSeparator())
- .append(
- String.format(
- "\t%s=%s",
- GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
- GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()))
.append(System.lineSeparator());
+ for (String confKey : PRINTED_NON_BYTES_CONFIGURATIONS) {
+ errorBuilder
+ .append(String.format("\t%s=%s", confKey,
getSqlConfStringOrNa(confKey, v -> v)))
+ .append(System.lineSeparator());
+ }
+ for (String confKey : PRINTED_BYTES_CONFIGURATIONS) {
+ errorBuilder
+ .append(
+ String.format(
+ "\t%s=%s",
+ confKey, getSqlConfStringOrNa(confKey,
ThrowOnOomMemoryTarget::reformatBytes)))
+ .append(System.lineSeparator());
+ }
// Dump all consumer usages to exception body
errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target));
errorBuilder.append(System.lineSeparator());
throw new OutOfMemoryException(errorBuilder.toString());
}
+ private static String getSqlConfStringOrNa(String confKey, Function<String,
String> ifPresent) {
+ final SQLConf sqlConf = SQLConf.get();
+ if (!sqlConf.contains(confKey)) {
+ return "N/A";
+ }
+ return ifPresent.apply(sqlConf.getConfString(confKey));
+ }
+
private static String reformatBytes(String in) {
return Utils.bytesToString(Utils.byteStringAsBytes(in));
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index b2915084dd..8b0149d252 100644
---
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -17,8 +17,7 @@
package org.apache.spark.memory
import org.apache.gluten.config.GlutenCoreConfig
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.memory.memtarget.{MemoryTarget, NoopMemoryTarget}
+import org.apache.gluten.memory.memtarget.{MemoryTarget, MemoryTargets,
NoopMemoryTarget}
/**
* API #acuqire is for reserving some global off-heap memory from Spark memory
manager. Once
@@ -33,14 +32,12 @@ object GlobalOffHeapMemory {
val target: MemoryTarget = if (GlutenCoreConfig.get.memoryUntracked) {
new NoopMemoryTarget()
} else {
- new GlobalOffHeapMemoryTarget()
+ MemoryTargets.throwOnOom(MemoryTargets.global())
}
def acquire(numBytes: Long): Unit = {
- if (target.borrow(numBytes) < numBytes) {
- // Throw OOM.
- throw new GlutenException(s"Spark global off-heap memory is exhausted.")
- }
+ // OOM will be handled in MemoryTargets.throwOnOom(...).
+ assert(target.borrow(numBytes) == numBytes)
}
def release(numBytes: Long): Unit = {
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
index c43f0c7e57..27f2bb214b 100644
---
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
@@ -108,6 +108,8 @@ class GlobalOffHeapMemoryTarget private[memory]
private[memory] def memoryManagerOption(): Option[MemoryManager] = {
val env = SparkEnv.get
if (env != null) {
+ // SPARK-46947: https://github.com/apache/spark/pull/45052.
+ ensureMemoryStoreInitialized(env)
return Some(env.memoryManager)
}
val tc = TaskContext.get()
@@ -120,6 +122,10 @@ class GlobalOffHeapMemoryTarget private[memory]
None
}
+ private def ensureMemoryStoreInitialized(env: SparkEnv): Unit = {
+ assert(env.blockManager.memoryStore != null)
+ }
+
override def name(): String = targetName
override def stats(): MemoryUsageStats = recorder.toStats
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]