This is an automated email from the ASF dual-hosted git repository. taiyang-li pushed a commit to branch fake_add_bolt_backend in repository https://gitbox.apache.org/repos/asf/gluten.git
commit b6c7ba6c080f985401b4180344e1aea893e42167 Author: liyang.127 <[email protected]> AuthorDate: Fri Jun 26 15:35:15 2026 +0800 [GLUTEN][CORE] Pass Spark task attempt id and pool name from Java to native runtime/memory-manager This patch extends RuntimeJniWrapper.createRuntime and the NativeMemoryManagerJniWrapper create/hold/release JNI methods with extra parameters that propagate the Spark task attempt id (and a memory-pool name) from the JVM down to the native side. Unlike the previous getter/setter approach, the task attempt id is now passed through the Runtime constructor/factory chain (and the pool name through the MemoryManager constructor/factory chain), so backends receive the value at construction time. The native ThreadManager wiring from the community is preserved. No behavior change for existing Velox backends; this unblocks future backends (e.g. Bolt) that need task-level identification at the native layer with minimal rebase conflicts. Generated-by: Claude claude-sonnet-4-6 --- cpp/core/compute/Runtime.cc | 5 +-- cpp/core/compute/Runtime.h | 20 +++++++++--- cpp/core/jni/JniWrapper.cc | 38 ++++++++++++++-------- cpp/core/memory/MemoryManager.cc | 5 +-- cpp/core/memory/MemoryManager.h | 13 ++++++-- cpp/velox/compute/VeloxBackend.cc | 12 ++++--- cpp/velox/compute/VeloxRuntime.cc | 5 +-- cpp/velox/compute/VeloxRuntime.h | 3 +- cpp/velox/memory/VeloxMemoryManager.cc | 5 +-- cpp/velox/memory/VeloxMemoryManager.h | 3 +- .../memory/NativeMemoryManagerJniWrapper.java | 6 ++-- .../apache/gluten/runtime/RuntimeJniWrapper.java | 2 +- .../apache/gluten/memory/NativeMemoryManager.scala | 9 +++-- .../scala/org/apache/gluten/runtime/Runtime.scala | 4 ++- 14 files changed, 88 insertions(+), 42 deletions(-) diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc index f283bc7283..83e724b28a 100644 --- a/cpp/core/compute/Runtime.cc +++ b/cpp/core/compute/Runtime.cc @@ -41,9 +41,10 @@ Runtime* Runtime::create( const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& sessionConf) { + const std::unordered_map<std::string, std::string>& sessionConf, + int64_t taskId) { auto& factory = runtimeFactories().get(kind); - return factory(kind, memoryManager, threadManager, sessionConf); + return factory(kind, memoryManager, threadManager, sessionConf, taskId); } void Runtime::release(Runtime* runtime) { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 07120684d3..ae9ce0c56e 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -68,14 +68,16 @@ class Runtime : public std::enable_shared_from_this<Runtime> { const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& sessionConf)>; + const std::unordered_map<std::string, std::string>& sessionConf, + int64_t taskId)>; using Releaser = std::function<void(Runtime*)>; static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); static Runtime* create( const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& sessionConf = {}); + const std::unordered_map<std::string, std::string>& sessionConf = {}, + int64_t taskId = -1); static void release(Runtime*); static std::optional<std::string>* localWriteFilesTempPath(); static std::optional<std::string>* localWriteFileName(); @@ -84,8 +86,13 @@ class Runtime : public std::enable_shared_from_this<Runtime> { const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& confMap) - : kind_(kind), memoryManager_(memoryManager), threadManager_(threadManager), confMap_(confMap) {} + const std::unordered_map<std::string, std::string>& confMap, + int64_t taskId = -1) + : kind_(kind), + memoryManager_(memoryManager), + threadManager_(threadManager), + confMap_(confMap), + taskId_(taskId) {} virtual ~Runtime() = default; @@ -194,6 +201,10 @@ class Runtime : public std::enable_shared_from_this<Runtime> { return objStore_->save(obj); } + int64_t taskId() const { + return taskId_; + } + protected: std::string kind_; MemoryManager* memoryManager_; @@ -206,5 +217,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> { std::optional<SparkTaskInfo> taskInfo_{std::nullopt}; std::shared_ptr<WholeStageDumper> dumper_{nullptr}; + int64_t taskId_{-1}; }; } // namespace gluten diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index efdfd36cba..af01d92ca0 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -156,7 +156,7 @@ inline static const std::string kInternalBackendKind{"internal"}; class InternalMemoryManager : public MemoryManager { public: - InternalMemoryManager(const std::string& kind) : MemoryManager(kind) {} + InternalMemoryManager(const std::string& kind, const std::string& name) : MemoryManager(kind, name) {} arrow::MemoryPool* defaultArrowMemoryPool() override { throw GlutenException("Not implemented"); @@ -183,12 +183,16 @@ class InternalRuntime : public Runtime { const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& confMap) - : Runtime(kind, memoryManager, threadManager, confMap) {} + const std::unordered_map<std::string, std::string>& confMap, + int64_t taskId) + : Runtime(kind, memoryManager, threadManager, confMap, taskId) {} }; -MemoryManager* internalMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) { - return new InternalMemoryManager(kind); +MemoryManager* internalMemoryManagerFactory( + const std::string& kind, + std::unique_ptr<AllocationListener> listener, + const std::string& name) { + return new InternalMemoryManager(kind, name); } void internalMemoryManagerReleaser(MemoryManager* memoryManager) { @@ -220,8 +224,9 @@ Runtime* internalRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& sessionConf) { - return new InternalRuntime(kind, memoryManager, threadManager, sessionConf); + const std::unordered_map<std::string, std::string>& sessionConf, + int64_t taskId) { + return new InternalRuntime(kind, memoryManager, threadManager, sessionConf, taskId); } void internalRuntimeReleaser(Runtime* runtime) { @@ -345,7 +350,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR jstring jBackendType, jlong nmmHandle, jlong ntmHandle, - jbyteArray sessionConf) { + jbyteArray sessionConf, + jlong taskAttemptId) { JNI_METHOD_START MemoryManager* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle); ThreadManager* threadManager = jniCastOrThrow<ThreadManager>(ntmHandle); @@ -353,7 +359,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); auto backendType = jStringToCString(env, jBackendType); - auto runtime = Runtime::create(backendType, memoryManager, threadManager, sparkConf); + auto runtime = + Runtime::create(backendType, memoryManager, threadManager, sparkConf, static_cast<int64_t>(taskAttemptId)); return reinterpret_cast<jlong>(runtime); JNI_METHOD_END(kInvalidObjectHandle) } @@ -378,13 +385,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap jclass, jstring jBackendType, jobject jListener, - jbyteArray sessionConf) { + jbyteArray sessionConf, + jstring jName) { JNI_METHOD_START JavaVM* vm; if (env->GetJavaVM(&vm) != JNI_OK) { throw GlutenException("Unable to get JavaVM instance"); } auto backendType = jStringToCString(env, jBackendType); + auto name = jStringToCString(env, jName); auto safeArray = getByteArrayElementsSafe(env, sessionConf); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); std::unique_ptr<AllocationListener> listener = std::make_unique<SparkAllocationListener>(vm, jListener); @@ -392,7 +401,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap if (backtrace) { listener = std::make_unique<BacktraceAllocationListener>(std::move(listener)); } - MemoryManager* mm = MemoryManager::create(backendType, std::move(listener)); + MemoryManager* mm = MemoryManager::create(backendType, std::move(listener), name); return reinterpret_cast<jlong>(mm); JNI_METHOD_END(-1L) } @@ -457,7 +466,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_hold( // NOLINT JNIEnv* env, jclass, - jlong nmmHandle) { + jlong nmmHandle, + jstring jName, + jlong taskAttemptId) { JNI_METHOD_START auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle); memoryManager->hold(); @@ -467,7 +478,8 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapp JNIEXPORT void JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_release( // NOLINT JNIEnv* env, jclass, - jlong nmmHandle) { + jlong nmmHandle, + jlong taskAttemptId) { JNI_METHOD_START auto* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle); MemoryManager::release(memoryManager); diff --git a/cpp/core/memory/MemoryManager.cc b/cpp/core/memory/MemoryManager.cc index 8ccf8af2cc..07984c51d8 100644 --- a/cpp/core/memory/MemoryManager.cc +++ b/cpp/core/memory/MemoryManager.cc @@ -35,9 +35,10 @@ void MemoryManager::registerFactory(const std::string& kind, MemoryManager::Fact memoryManagerReleasers().registerObj(kind, std::move(releaser)); } -MemoryManager* MemoryManager::create(const std::string& kind, std::unique_ptr<AllocationListener> listener) { +MemoryManager* +MemoryManager::create(const std::string& kind, std::unique_ptr<AllocationListener> listener, const std::string& name) { auto& factory = memoryManagerFactories().get(kind); - return factory(kind, std::move(listener)); + return factory(kind, std::move(listener), name); } void MemoryManager::release(MemoryManager* memoryManager) { diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h index 93d875ad1e..9b003086b9 100644 --- a/cpp/core/memory/MemoryManager.h +++ b/cpp/core/memory/MemoryManager.h @@ -25,13 +25,15 @@ namespace gluten { class MemoryManager { public: - using Factory = std::function<MemoryManager*(const std::string& kind, std::unique_ptr<AllocationListener> listener)>; + using Factory = std::function< + MemoryManager*(const std::string& kind, std::unique_ptr<AllocationListener> listener, const std::string& name)>; using Releaser = std::function<void(MemoryManager*)>; static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); - static MemoryManager* create(const std::string& kind, std::unique_ptr<AllocationListener> listener); + static MemoryManager* + create(const std::string& kind, std::unique_ptr<AllocationListener> listener, const std::string& name = ""); static void release(MemoryManager*); - MemoryManager(const std::string& kind) : kind_(kind){}; + MemoryManager(const std::string& kind, const std::string& name = "") : kind_(kind), name_(name){}; virtual ~MemoryManager() = default; @@ -39,6 +41,10 @@ class MemoryManager { return kind_; } + std::string name() const { + return name_; + } + // Get the default Arrow memory pool for this memory manager. This memory pool is held by the memory manager. virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0; @@ -58,6 +64,7 @@ class MemoryManager { private: std::string kind_; + std::string name_; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index aa77d0ffe7..8bdd3fac0f 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -77,8 +77,11 @@ using namespace facebook; namespace gluten { namespace { -MemoryManager* veloxMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) { - return new VeloxMemoryManager(kind, std::move(listener), *VeloxBackend::get()->getBackendConf()); +MemoryManager* veloxMemoryManagerFactory( + const std::string& kind, + std::unique_ptr<AllocationListener> listener, + const std::string& name) { + return new VeloxMemoryManager(kind, std::move(listener), *VeloxBackend::get()->getBackendConf(), name); } void veloxMemoryManagerReleaser(MemoryManager* memoryManager) { @@ -89,10 +92,11 @@ Runtime* veloxRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& sessionConf) { + const std::unordered_map<std::string, std::string>& sessionConf, + int64_t taskId) { auto* vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager); GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); - return new VeloxRuntime(kind, vmm, threadManager, sessionConf); + return new VeloxRuntime(kind, vmm, threadManager, sessionConf, taskId); } void veloxRuntimeReleaser(Runtime* runtime) { diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index f13430bd0c..a9155eee4b 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -238,8 +238,9 @@ VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& confMap) - : Runtime(kind, vmm, threadManager, confMap) { + const std::unordered_map<std::string, std::string>& confMap, + int64_t taskId) + : Runtime(kind, vmm, threadManager, confMap, taskId) { // Refresh session config. veloxCfg_ = std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string, std::string>(confMap_)); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index c6ee1c462c..5e04f254cd 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -43,7 +43,8 @@ class VeloxRuntime final : public Runtime { const std::string& kind, VeloxMemoryManager* vmm, ThreadManager* threadManager, - const std::unordered_map<std::string, std::string>& confMap); + const std::unordered_map<std::string, std::string>& confMap, + int64_t taskId = -1); ~VeloxRuntime() override; diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index ad46a6ce9b..1bfd487b69 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -221,8 +221,9 @@ ArbitratorFactoryRegister::~ArbitratorFactoryRegister() { VeloxMemoryManager::VeloxMemoryManager( const std::string& kind, std::unique_ptr<AllocationListener> listener, - const facebook::velox::config::ConfigBase& backendConf) - : MemoryManager(kind), listener_(std::move(listener)) { + const facebook::velox::config::ConfigBase& backendConf, + const std::string& name) + : MemoryManager(kind, name), listener_(std::move(listener)) { auto reservationBlockSize = backendConf.get<uint64_t>(kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault); blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize); diff --git a/cpp/velox/memory/VeloxMemoryManager.h b/cpp/velox/memory/VeloxMemoryManager.h index 9d4b9e9a9b..216ce88b8b 100644 --- a/cpp/velox/memory/VeloxMemoryManager.h +++ b/cpp/velox/memory/VeloxMemoryManager.h @@ -59,7 +59,8 @@ class VeloxMemoryManager final : public MemoryManager { VeloxMemoryManager( const std::string& kind, std::unique_ptr<AllocationListener> listener, - const facebook::velox::config::ConfigBase& backendConf); + const facebook::velox::config::ConfigBase& backendConf, + const std::string& name = ""); ~VeloxMemoryManager() override; VeloxMemoryManager(const VeloxMemoryManager&) = delete; diff --git a/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java index c23b9704ec..15dc8f3152 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/memory/NativeMemoryManagerJniWrapper.java @@ -22,13 +22,13 @@ public class NativeMemoryManagerJniWrapper { private NativeMemoryManagerJniWrapper() {} public static native long create( - String backendType, ReservationListener listener, byte[] sessionConf); + String backendType, ReservationListener listener, byte[] sessionConf, String name); public static native byte[] collectUsage(long handle); public static native long shrink(long handle, long size); - public static native void hold(long handle); + public static native void hold(long handle, String name, long taskAttemptId); - public static native void release(long handle); + public static native void release(long handle, long taskAttemptId); } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java index ed22759672..cb22250e4e 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java @@ -21,7 +21,7 @@ public class RuntimeJniWrapper { private RuntimeJniWrapper() {} public static native long createRuntime( - String backendType, long nmm, long ntm, byte[] sessionConf); + String backendType, long nmm, long ntm, byte[] sessionConf, long taskAttemptId); public static native void releaseRuntime(long handle); } diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala index 159e1bba5e..ed3fba8ee3 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala @@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.{KnownNameAndStats, MemoryTarget, Spil import org.apache.gluten.proto.MemoryUsageStats import org.apache.gluten.utils.ConfigUtil +import org.apache.spark.TaskContext import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} import org.apache.spark.task.{TaskResource, TaskResources} @@ -54,7 +55,8 @@ object NativeMemoryManager { ConfigUtil.serialize( GlutenConfig .getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) - .asJava) + .asJava), + name ) spillers.append(new Spiller() { override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = phase match { @@ -76,7 +78,8 @@ object NativeMemoryManager { private val released: AtomicBoolean = new AtomicBoolean(false) override def addSpiller(spiller: Spiller): Unit = spillers.append(spiller) - override def hold(): Unit = NativeMemoryManagerJniWrapper.hold(handle) + override def hold(): Unit = + NativeMemoryManagerJniWrapper.hold(handle, name, TaskContext.get().taskAttemptId()) override def getHandle(): Long = handle override def release(): Unit = { if (!released.compareAndSet(false, true)) { @@ -97,7 +100,7 @@ object NativeMemoryManager { LOGGER.debug("About to release memory manager, " + dump()) } - NativeMemoryManagerJniWrapper.release(handle) + NativeMemoryManagerJniWrapper.release(handle, TaskContext.get().taskAttemptId()) if (rl.getUsedBytes != 0) { LOGGER.warn( diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index 2e3c468140..18b7589de2 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -22,6 +22,7 @@ import org.apache.gluten.memory.NativeMemoryManager import org.apache.gluten.threads.{NativeThreadManager, TaskChildThreadInitializer} import org.apache.gluten.utils.ConfigUtil +import org.apache.spark.TaskContext import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} import org.apache.spark.task.{TaskResource, TaskResources} @@ -64,7 +65,8 @@ object Runtime { (GlutenConfig .getNativeSessionConf( backendName, - GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava) + GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava), + TaskContext.get().taskAttemptId() ) private val released: AtomicBoolean = new AtomicBoolean(false) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
