This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 55645156c4 [GLUTEN-11169][VL] Fix OOM error not thrown when global 
off-heap memory is running out by off-heap BHJ (#11208)
55645156c4 is described below

commit 55645156c4cd8f779aad7aba6fae9c3f1bb7bd78
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Nov 28 16:01:00 2025 +0000

    [GLUTEN-11169][VL] Fix OOM error not thrown when global off-heap memory is 
running out by off-heap BHJ (#11208)
---
 .../backendsapi/velox/VeloxListenerApi.scala       |  6 +-
 .../spark/memory/GlobalOffHeapMemorySuite.scala    | 10 +--
 .../UnsafeColumnarBuildSideRelationTest.scala      | 73 +++++++++++++++++---
 .../gluten/memory/memtarget/MemoryTargets.java     |  5 ++
 .../memory/memtarget/ThrowOnOomMemoryTarget.java   | 79 +++++++++++-----------
 .../apache/spark/memory/GlobalOffHeapMemory.scala  | 11 ++-
 .../spark/memory/GlobalOffHeapMemoryTarget.scala   |  6 ++
 7 files changed, 125 insertions(+), 65 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 3488ef5653..585f6d736d 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.init.NativeBackendInitializer
 import org.apache.gluten.jni.{JniLibLoader, JniWorkspace}
 import org.apache.gluten.memory.{MemoryUsageRecorder, 
SimpleMemoryUsageRecorder}
 import org.apache.gluten.memory.listener.ReservationListener
+import org.apache.gluten.memory.memtarget.MemoryTarget
 import org.apache.gluten.monitor.VeloxMemoryProfiler
 import org.apache.gluten.udf.UdfJniWrapper
 import org.apache.gluten.utils._
@@ -276,15 +277,16 @@ object VeloxListenerApi {
   private def newGlobalOffHeapMemoryListener(): ReservationListener = {
     new ReservationListener {
       private val recorder: MemoryUsageRecorder = new 
SimpleMemoryUsageRecorder()
+      private val target: MemoryTarget = GlobalOffHeapMemory.target
 
       override def reserve(size: Long): Long = {
-        GlobalOffHeapMemory.acquire(size)
+        assert(target.borrow(size) == size)
         recorder.inc(size)
         size
       }
 
       override def unreserve(size: Long): Long = {
-        GlobalOffHeapMemory.release(size)
+        assert(target.repay(size) == size)
         recorder.inc(-size)
         size
       }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 084386fcd7..95053e3e65 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.memory
 
 import org.apache.gluten.config.GlutenCoreConfig
-import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
+import 
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
 
 import org.apache.spark.TaskContext
@@ -54,11 +54,11 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
       assert(consumer.borrow(300) == 300)
       GlobalOffHeapMemory.acquire(50)
       GlobalOffHeapMemory.acquire(40)
-      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(30))
-      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(11))
+      assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(30))
+      assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(11))
       GlobalOffHeapMemory.acquire(10)
       GlobalOffHeapMemory.acquire(0)
-      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(1))
+      assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(1))
     }
   }
 
@@ -113,7 +113,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
             Spillers.NOOP,
             Collections.emptyMap())
       assert(consumer.borrow(300) == 300)
-      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(200))
+      assertThrows[OutOfMemoryException](GlobalOffHeapMemory.acquire(200))
       assert(consumer.repay(100) == 100)
       GlobalOffHeapMemory.acquire(200)
     }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
index 6d0448fd84..252bf451f6 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
@@ -17,10 +17,13 @@
 package org.apache.spark.sql.execution.unsafe
 
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import 
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
 
 import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.memory.GlobalOffHeapMemory
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode
 import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
 import org.apache.spark.sql.test.SharedSparkSession
@@ -29,6 +32,9 @@ import org.apache.spark.unsafe.Platform
 
 import java.util
 
+import scala.collection.mutable
+import scala.util.Random
+
 class UnsafeColumnarBuildSideRelationTest extends SharedSparkSession {
   override protected def sparkConf: SparkConf = {
     super.sparkConf
@@ -38,12 +44,18 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
 
   var unsafeRelWithIdentityMode: UnsafeColumnarBuildSideRelation = _
   var unsafeRelWithHashMode: UnsafeColumnarBuildSideRelation = _
+  var output: Seq[Attribute] = _
   var sampleBytes: Array[Array[Byte]] = _
+  var initialGlobalBytes: Long = _
 
   private def toUnsafeByteArray(bytes: Array[Byte]): UnsafeByteArray = {
     val buf = ArrowBufferAllocators.globalInstance().buffer(bytes.length)
-    buf.setBytes(0, bytes, 0, bytes.length);
-    new UnsafeByteArray(buf, bytes.length.toLong)
+    buf.setBytes(0, bytes, 0, bytes.length)
+    try {
+      new UnsafeByteArray(buf, bytes.length.toLong)
+    } finally {
+      buf.close()
+    }
   }
 
   private def toByteArray(unsafeByteArray: UnsafeByteArray): Array[Byte] = {
@@ -59,17 +71,48 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    val a = AttributeReference("a", StringType, nullable = false, null)()
-    val output = Seq(a)
-    sampleBytes = Array("12345".getBytes(), "7890".getBytes)
-    unsafeRelWithIdentityMode = UnsafeColumnarBuildSideRelation(
+    initialGlobalBytes = GlobalOffHeapMemory.currentBytes()
+    output = Seq(AttributeReference("a", StringType, nullable = false, null)())
+    sampleBytes = Array(randomBytes(10), randomBytes(100))
+    unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(sampleBytes: 
_*)
+    unsafeRelWithHashMode = newUnsafeRelationWithHashMode(sampleBytes: _*)
+  }
+
+  override protected def afterAll(): Unit = {
+    // Makes sure all the underlying UnsafeByteArray instances become GC 
non-reachable and
+    // be released after a full-GC.
+    unsafeRelWithIdentityMode = null
+    unsafeRelWithHashMode = null
+    System.gc()
+    Thread.sleep(500)
+    // FIXME: This should be zero. We had to assert with the initial bytes 
because
+    //  there were some allocations from the previous run suites.
+    assert(GlobalOffHeapMemory.currentBytes() == initialGlobalBytes)
+  }
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val array = new Array[Byte](size)
+    val random = new Random()
+    random.nextBytes(array)
+    array
+  }
+
+  private def newUnsafeRelationWithIdentityMode(
+      bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
+    require(bytes.nonEmpty)
+    UnsafeColumnarBuildSideRelation(
       output,
-      sampleBytes.map(a => toUnsafeByteArray(a)),
+      bytes.map(a => toUnsafeByteArray(a)),
       IdentityBroadcastMode
     )
-    unsafeRelWithHashMode = UnsafeColumnarBuildSideRelation(
+  }
+
+  private def newUnsafeRelationWithHashMode(
+      bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
+    require(bytes.nonEmpty)
+    UnsafeColumnarBuildSideRelation(
       output,
-      sampleBytes.map(a => toUnsafeByteArray(a)),
+      bytes.map(a => toUnsafeByteArray(a)),
       HashedRelationBroadcastMode(output, isNullAware = false)
     )
   }
@@ -124,4 +167,14 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
         sampleBytes.asInstanceOf[Array[AnyRef]]))
   }
 
+  test("Should throw OOM when off-heap memory is running out") {
+    // 500 MiB > 200 MiB so OOM should be thrown.
+    val relations = mutable.ListBuffer[UnsafeColumnarBuildSideRelation]()
+    assertThrows[OutOfMemoryException] {
+      for (i <- 0 until 10) {
+        relations += 
newUnsafeRelationWithHashMode(randomBytes(ByteUnit.MiB.toBytes(50).toInt))
+      }
+    }
+    relations.clear()
+  }
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index e6cab4b6a8..2ad47f2890 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -22,6 +22,7 @@ import 
org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
 
 import org.apache.spark.SparkEnv;
 import org.apache.spark.annotation.Experimental;
+import org.apache.spark.memory.GlobalOffHeapMemoryTarget;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.util.SparkResourceUtil;
@@ -37,6 +38,10 @@ public final class MemoryTargets {
     // enclose factory ctor
   }
 
+  public static MemoryTarget global() {
+    return new GlobalOffHeapMemoryTarget();
+  }
+
   public static MemoryTarget throwOnOom(MemoryTarget target) {
     return new ThrowOnOomMemoryTarget(target);
   }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index 8ebedd5418..5a2b10e279 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -23,7 +23,22 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.task.TaskResources;
 import org.apache.spark.util.Utils;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+
 public class ThrowOnOomMemoryTarget implements MemoryTarget {
+  private static final List<String> PRINTED_NON_BYTES_CONFIGURATIONS =
+      Arrays.asList(
+          GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
+          GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED().key());
+
+  private static final List<String> PRINTED_BYTES_CONFIGURATIONS =
+      Arrays.asList(
+          GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
+          GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
+          
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key());
+
   private final MemoryTarget target;
 
   public ThrowOnOomMemoryTarget(MemoryTarget target) {
@@ -46,8 +61,8 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget {
     if (TaskResources.inSparkTask()) {
       
TaskResources.getLocalTaskContext().taskMemoryManager().showMemoryUsage();
     }
-    // Build error message, then throw
-    StringBuilder errorBuilder = new StringBuilder();
+    // Build error message, then throw.
+    final StringBuilder errorBuilder = new StringBuilder();
     errorBuilder
         .append(
             String.format(
@@ -58,52 +73,34 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget 
{
                     + "is not enabled). %n",
                 Utils.bytesToString(size), Utils.bytesToString(granted)))
         .append("Current config settings: ")
-        .append(System.lineSeparator())
-        .append(
-            String.format(
-                "\t%s=%s",
-                GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key(),
-                reformatBytes(
-                    SQLConf.get()
-                        
.getConfString(GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES().key()))))
-        .append(System.lineSeparator())
-        .append(
-            String.format(
-                "\t%s=%s",
-                GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
-                reformatBytes(
-                    SQLConf.get()
-                        .getConfString(
-                            
GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES().key()))))
-        .append(System.lineSeparator())
-        .append(
-            String.format(
-                "\t%s=%s",
-                
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES().key(),
-                reformatBytes(
-                    SQLConf.get()
-                        .getConfString(
-                            
GlutenCoreConfig.COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES()
-                                .key()))))
-        .append(System.lineSeparator())
-        .append(
-            String.format(
-                "\t%s=%s",
-                GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
-                
SQLConf.get().getConfString(GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY())))
-        .append(System.lineSeparator())
-        .append(
-            String.format(
-                "\t%s=%s",
-                GlutenCoreConfig.DYNAMIC_OFFHEAP_SIZING_ENABLED().key(),
-                GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()))
         .append(System.lineSeparator());
+    for (String confKey : PRINTED_NON_BYTES_CONFIGURATIONS) {
+      errorBuilder
+          .append(String.format("\t%s=%s", confKey, 
getSqlConfStringOrNa(confKey, v -> v)))
+          .append(System.lineSeparator());
+    }
+    for (String confKey : PRINTED_BYTES_CONFIGURATIONS) {
+      errorBuilder
+          .append(
+              String.format(
+                  "\t%s=%s",
+                  confKey, getSqlConfStringOrNa(confKey, 
ThrowOnOomMemoryTarget::reformatBytes)))
+          .append(System.lineSeparator());
+    }
     // Dump all consumer usages to exception body
     errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target));
     errorBuilder.append(System.lineSeparator());
     throw new OutOfMemoryException(errorBuilder.toString());
   }
 
+  private static String getSqlConfStringOrNa(String confKey, Function<String, 
String> ifPresent) {
+    final SQLConf sqlConf = SQLConf.get();
+    if (!sqlConf.contains(confKey)) {
+      return "N/A";
+    }
+    return ifPresent.apply(sqlConf.getConfString(confKey));
+  }
+
   private static String reformatBytes(String in) {
     return Utils.bytesToString(Utils.byteStringAsBytes(in));
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index b2915084dd..8b0149d252 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -17,8 +17,7 @@
 package org.apache.spark.memory
 
 import org.apache.gluten.config.GlutenCoreConfig
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.memory.memtarget.{MemoryTarget, NoopMemoryTarget}
+import org.apache.gluten.memory.memtarget.{MemoryTarget, MemoryTargets, 
NoopMemoryTarget}
 
 /**
  * API #acuqire is for reserving some global off-heap memory from Spark memory 
manager. Once
@@ -33,14 +32,12 @@ object GlobalOffHeapMemory {
   val target: MemoryTarget = if (GlutenCoreConfig.get.memoryUntracked) {
     new NoopMemoryTarget()
   } else {
-    new GlobalOffHeapMemoryTarget()
+    MemoryTargets.throwOnOom(MemoryTargets.global())
   }
 
   def acquire(numBytes: Long): Unit = {
-    if (target.borrow(numBytes) < numBytes) {
-      // Throw OOM.
-      throw new GlutenException(s"Spark global off-heap memory is exhausted.")
-    }
+    // OOM will be handled in MemoryTargets.throwOnOom(...).
+    assert(target.borrow(numBytes) == numBytes)
   }
 
   def release(numBytes: Long): Unit = {
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
index c43f0c7e57..27f2bb214b 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemoryTarget.scala
@@ -108,6 +108,8 @@ class GlobalOffHeapMemoryTarget private[memory]
   private[memory] def memoryManagerOption(): Option[MemoryManager] = {
     val env = SparkEnv.get
     if (env != null) {
+      // SPARK-46947: https://github.com/apache/spark/pull/45052.
+      ensureMemoryStoreInitialized(env)
       return Some(env.memoryManager)
     }
     val tc = TaskContext.get()
@@ -120,6 +122,10 @@ class GlobalOffHeapMemoryTarget private[memory]
     None
   }
 
+  private def ensureMemoryStoreInitialized(env: SparkEnv): Unit = {
+    assert(env.blockManager.memoryStore != null)
+  }
+
   override def name(): String = targetName
 
   override def stats(): MemoryUsageStats = recorder.toStats


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to