This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 26a8f9cb38 [VL] Simplify lifecycle of NativeMemoryManager and
NativeThreadManager (#12323)
26a8f9cb38 is described below
commit 26a8f9cb380f322d100d1ff56ade88b101504061
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jun 30 11:14:23 2026 +0100
[VL] Simplify lifecycle of NativeMemoryManager and NativeThreadManager
(#12323)
---
.../apache/gluten/memory/NativeMemoryManager.scala | 20 ++++++----------
.../scala/org/apache/gluten/runtime/Runtime.scala | 3 ++-
.../gluten/threads/NativeThreadManager.scala | 27 +++++++++-------------
3 files changed, 20 insertions(+), 30 deletions(-)
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 159e1bba5e..3308f357a7 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -25,7 +25,6 @@ import org.apache.gluten.utils.ConfigUtil
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
-import org.apache.spark.task.{TaskResource, TaskResources}
import org.slf4j.LoggerFactory
@@ -38,12 +37,13 @@ trait NativeMemoryManager {
def addSpiller(spiller: Spiller): Unit
def hold(): Unit
def getHandle(): Long
+ def release(): Unit
}
object NativeMemoryManager {
private class Impl(backendName: String, name: String)
- extends NativeMemoryManager
- with TaskResource {
+ extends NativeMemoryManager {
+ private val nmmName = s"[nmm-$name]"
private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager])
private val spillers = Spillers.appendable()
private val mutableStats: mutable.Map[String, MemoryUsageStatsBuilder] =
mutable.Map()
@@ -81,14 +81,14 @@ object NativeMemoryManager {
override def release(): Unit = {
if (!released.compareAndSet(false, true)) {
throw new GlutenException(
- s"Memory manager instance already released: $handle,
${resourceName()}, ${priority()}")
+ s"Memory manager instance already released: $handle")
}
def dump(): String = {
SparkMemoryUtil.prettyPrintStats(
- s"[${resourceName()}]",
+ nmmName,
new KnownNameAndStats() {
- override def name: String = resourceName()
+ override def name: String = nmmName
override def stats: MemoryUsageStats = collectUsage()
})
}
@@ -110,15 +110,9 @@ object NativeMemoryManager {
))
}
}
- override def priority(): Int = {
- // Memory managers should be released after all runtimes are released.
- // So set the priority lower than runtime resources.
- 10
- }
- override def resourceName(): String = "nmm"
}
def apply(backendName: String, name: String): NativeMemoryManager = {
- TaskResources.addAnonymousResource(new Impl(backendName, name))
+ new Impl(backendName, name)
}
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index 2e3c468140..885dd831e8 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -81,7 +81,8 @@ object Runtime {
s"Runtime instance already released: $handle, ${resourceName()},
${priority()}")
}
RuntimeJniWrapper.releaseRuntime(handle)
-
+ ntm.release()
+ nmm.release()
}
override def priority(): Int = 30
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
index 8abe6d9c9b..ce0507b69a 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
@@ -18,29 +18,29 @@ package org.apache.gluten.threads
import org.apache.gluten.exception.GlutenException
-import org.apache.spark.task.{TaskResource, TaskResources}
-
import java.util.concurrent.atomic.AtomicBoolean
/**
* Scala wrapper around a native ThreadManager handle.
*
- * Created once per Spark task and registered as a [[TaskResource]] so it is
automatically released
- * when the task completes. The ThreadManager wraps a
[[NativeThreadInitializer]] that propagates
- * task context to native worker threads spawned by folly executors.
+ * Created once per Spark task by [[org.apache.gluten.runtime.Runtime]]. The
ThreadManager wraps a
+ * [[NativeThreadInitializer]] that propagates task context to native worker
threads spawned by
+ * folly executors.
*/
trait NativeThreadManager {
/** @return opaque native handle passed to RuntimeJniWrapper#createRuntime.
*/
def getHandle(): Long
+
+ /** Release the native ThreadManager handle. Called by Runtime during task
completion. */
+ def release(): Unit
}
object NativeThreadManager {
private class Impl(
private val backendName: String,
private val initializer: NativeThreadInitializer)
- extends NativeThreadManager
- with TaskResource {
+ extends NativeThreadManager {
private val handle = NativeThreadManagerJniWrapper.create(backendName,
initializer)
private val released = new AtomicBoolean(false)
@@ -49,20 +49,15 @@ object NativeThreadManager {
override def release(): Unit = {
if (!released.compareAndSet(false, true)) {
throw new GlutenException(
- s"Thread manager instance already released: $handle,
${resourceName()}, ${priority()}")
+ s"Thread manager instance already released: $handle")
}
NativeThreadManagerJniWrapper.release(handle)
}
-
- // Release before MemoryManager (10) but after most other resources.
- override def priority(): Int = 20
-
- override def resourceName(): String = "ntm"
}
/**
- * Create a new NativeThreadManager and register it with the current Spark
task's
- * [[TaskResources]] so it is automatically released when the task finishes.
+ * Create a new NativeThreadManager. The caller (typically Runtime) is
responsible for calling
+ * `release()` when the manager is no longer needed.
*
* @param backendName
* the backend kind string (e.g., "velox").
@@ -70,6 +65,6 @@ object NativeThreadManager {
* callback invoked when native worker threads are created / destroyed.
*/
def apply(backendName: String, initializer: NativeThreadInitializer):
NativeThreadManager = {
- TaskResources.addAnonymousResource(new Impl(backendName, initializer))
+ new Impl(backendName, initializer)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]