This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 69854faaae [GLUTEN-11169][VL] Trigger GC before throwing OOM to ensure 
unused off-heap broadcasted relations are correctly released (#11236)
69854faaae is described below

commit 69854faaae98b11cf40dcf74ef900dcbeb91230f
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Dec 15 16:45:25 2025 +0800

    [GLUTEN-11169][VL] Trigger GC before throwing OOM to ensure unused off-heap 
broadcasted relations are correctly released (#11236)
---
 .../UnsafeColumnarBuildSideRelationTest.scala      | 63 ++++++++++--------
 .../memory/memtarget/ThrowOnOomMemoryTarget.java   | 75 ++++++++++++++++++++--
 2 files changed, 103 insertions(+), 35 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
index 252bf451f6..82adee0375 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
@@ -45,19 +45,9 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
   var unsafeRelWithIdentityMode: UnsafeColumnarBuildSideRelation = _
   var unsafeRelWithHashMode: UnsafeColumnarBuildSideRelation = _
   var output: Seq[Attribute] = _
-  var sampleBytes: Array[Array[Byte]] = _
+  var sample1KBytes: Array[Byte] = _
   var initialGlobalBytes: Long = _
 
-  private def toUnsafeByteArray(bytes: Array[Byte]): UnsafeByteArray = {
-    val buf = ArrowBufferAllocators.globalInstance().buffer(bytes.length)
-    buf.setBytes(0, bytes, 0, bytes.length)
-    try {
-      new UnsafeByteArray(buf, bytes.length.toLong)
-    } finally {
-      buf.close()
-    }
-  }
-
   private def toByteArray(unsafeByteArray: UnsafeByteArray): Array[Byte] = {
     val byteArray = new Array[Byte](Math.toIntExact(unsafeByteArray.size()))
     Platform.copyMemory(
@@ -73,9 +63,9 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     super.beforeAll()
     initialGlobalBytes = GlobalOffHeapMemory.currentBytes()
     output = Seq(AttributeReference("a", StringType, nullable = false, null)())
-    sampleBytes = Array(randomBytes(10), randomBytes(100))
-    unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(sampleBytes: 
_*)
-    unsafeRelWithHashMode = newUnsafeRelationWithHashMode(sampleBytes: _*)
+    sample1KBytes = randomBytes(1024)
+    unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(2)
+    unsafeRelWithHashMode = newUnsafeRelationWithHashMode(2)
   }
 
   override protected def afterAll(): Unit = {
@@ -84,7 +74,7 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     unsafeRelWithIdentityMode = null
     unsafeRelWithHashMode = null
     System.gc()
-    Thread.sleep(500)
+    Thread.sleep(1000)
     // FIXME: This should be zero. We had to assert with the initial bytes 
because
     //  there were some allocations from the previous run suites.
     assert(GlobalOffHeapMemory.currentBytes() == initialGlobalBytes)
@@ -97,22 +87,31 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     array
   }
 
-  private def newUnsafeRelationWithIdentityMode(
-      bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
-    require(bytes.nonEmpty)
+  private def sampleUnsafeByteArrayInKb(sizeInKb: Int): UnsafeByteArray = {
+    val sizeInBytes = sizeInKb * 1024
+    val buf = ArrowBufferAllocators.globalInstance().buffer(sizeInBytes)
+    for (i <- 0 until sizeInKb) {
+      buf.setBytes(i * 1024, sample1KBytes, 0, 1024)
+    }
+    try {
+      new UnsafeByteArray(buf, sizeInBytes)
+    } finally {
+      buf.close()
+    }
+  }
+
+  private def newUnsafeRelationWithIdentityMode(sizeInKb: Int): 
UnsafeColumnarBuildSideRelation = {
     UnsafeColumnarBuildSideRelation(
       output,
-      bytes.map(a => toUnsafeByteArray(a)),
+      (0 until sizeInKb).map(_ => sampleUnsafeByteArrayInKb(1)),
       IdentityBroadcastMode
     )
   }
 
-  private def newUnsafeRelationWithHashMode(
-      bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
-    require(bytes.nonEmpty)
+  private def newUnsafeRelationWithHashMode(sizeInKb: Int): 
UnsafeColumnarBuildSideRelation = {
     UnsafeColumnarBuildSideRelation(
       output,
-      bytes.map(a => toUnsafeByteArray(a)),
+      (0 until sizeInKb).map(_ => sampleUnsafeByteArrayInKb(1)),
       HashedRelationBroadcastMode(output, isNullAware = false)
     )
   }
@@ -129,7 +128,7 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     assert(
       util.Arrays.deepEquals(
         obj.getBatches().map(toByteArray).toArray[AnyRef],
-        sampleBytes.asInstanceOf[Array[AnyRef]]))
+        Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
 
     // test unsafeRelWithHashMode
     val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode)
@@ -139,7 +138,7 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     assert(
       util.Arrays.deepEquals(
         obj2.getBatches().map(toByteArray).toArray[AnyRef],
-        sampleBytes.asInstanceOf[Array[AnyRef]]))
+        Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
   }
 
   test("Kryo serialization") {
@@ -154,7 +153,7 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     assert(
       util.Arrays.deepEquals(
         obj.getBatches().map(toByteArray).toArray[AnyRef],
-        sampleBytes.asInstanceOf[Array[AnyRef]]))
+        Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
 
     // test unsafeRelWithHashMode
     val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode)
@@ -164,7 +163,7 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     assert(
       util.Arrays.deepEquals(
         obj2.getBatches().map(toByteArray).toArray[AnyRef],
-        sampleBytes.asInstanceOf[Array[AnyRef]]))
+        Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
   }
 
   test("Should throw OOM when off-heap memory is running out") {
@@ -172,9 +171,17 @@ class UnsafeColumnarBuildSideRelationTest extends 
SharedSparkSession {
     val relations = mutable.ListBuffer[UnsafeColumnarBuildSideRelation]()
     assertThrows[OutOfMemoryException] {
       for (i <- 0 until 10) {
-        relations += 
newUnsafeRelationWithHashMode(randomBytes(ByteUnit.MiB.toBytes(50).toInt))
+        relations += 
newUnsafeRelationWithHashMode(ByteUnit.MiB.toKiB(50).toInt)
       }
     }
     relations.clear()
   }
+
+  test("Should trigger GC before OOM") {
+    // 500 MiB > 200 MiB, but since we don't preserve the references to the 
created relations,
+    // GC will be triggered and OOM should not be thrown.
+    for (i <- 0 until 10) {
+      newUnsafeRelationWithHashMode(ByteUnit.MiB.toKiB(50).toInt)
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index 5a2b10e279..395f956fe9 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -22,12 +22,20 @@ import org.apache.spark.memory.SparkMemoryUtil;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.task.TaskResources;
 import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
 
 public class ThrowOnOomMemoryTarget implements MemoryTarget {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThrowOnOomMemoryTarget.class);
+  // Max number of sleeps during retrying the reservation.
+  // Durations are orderly 1, 2, 4, 8, 16, 32, 64, 128, 256 (total 511 ms ~ 
0.5 s).
+  private static final int MAX_SLEEPS = 9;
+  private static final int MAX_WAIT_MS = 1000;
+
   private static final List<String> PRINTED_NON_BYTES_CONFIGURATIONS =
       Arrays.asList(
           GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
@@ -47,17 +55,70 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget 
{
 
   @Override
   public long borrow(long size) {
-    long granted = target.borrow(size);
-    if (granted >= size) {
-      return granted;
+    long granted;
+    {
+      granted = target.borrow(size);
+      if (granted >= size) {
+        return granted;
+      }
+      if (granted != 0L) {
+        target.repay(granted);
+      }
+    }
+
+    // About to OOM.
+    LOG.warn("Off-heap reservation of {} bytes failed.", size);
+
+    // Invoke GC, then retry up to 9 times (1s extra delay in total) for this
+    // reservation. This is for ensuring we waited for GC to collect all the
+    // non-reachable objects, during which the off-heap allocations might also
+    // be returned to the memory manager. For example, UnsafeByteArray 
implements
+    // `finalize` to release its off-heap memory allocation so its lifecycle
+    // relies on JVM GC.
+    LOG.warn("Invoking GC to try reclaiming some off-heap memory space if 
applicable...");
+    System.gc();
+    final long start = System.currentTimeMillis();
+    int sleeps = 0;
+    long sleepTime = 1;
+    while (true) {
+      final long elapsedMs = System.currentTimeMillis() - start;
+      if (elapsedMs >= MAX_WAIT_MS) {
+        LOG.warn("Max wait time (in ms) {} has reached. ", MAX_WAIT_MS);
+        break;
+      }
+      LOG.warn(
+          "Retrying reserving {} bytes (finished {}/{} number of sleeps, 
elapsed {}/{} ms)... ",
+          size,
+          sleeps,
+          MAX_SLEEPS,
+          elapsedMs,
+          MAX_WAIT_MS);
+      granted = target.borrow(size);
+      if (granted >= size) {
+        return granted;
+      }
+      if (granted != 0L) {
+        target.repay(granted);
+      }
+      if (sleeps >= MAX_SLEEPS) {
+        LOG.warn("Max number of sleeps {} has reached. ", MAX_SLEEPS);
+        break;
+      }
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+      sleepTime *= 2;
+      sleeps++;
     }
+
     // OOM happens.
     // Note if the target is a Spark memory consumer, spilling should already 
be requested but
     // failed to reclaim enough memory.
-    if (granted != 0L) {
-      target.repay(granted);
-    }
-    // Log memory usage
+    //
+    // Log memory usage.
     if (TaskResources.inSparkTask()) {
       
TaskResources.getLocalTaskContext().taskMemoryManager().showMemoryUsage();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to