This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 75ddcfc3f chore: Making comet native operators write spill files to
spark local dir (#1581)
75ddcfc3f is described below
commit 75ddcfc3fc4393fa2b97ac90b7fb398f5833bf8f
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Sat Apr 5 02:17:52 2025 +0800
chore: Making comet native operators write spill files to spark local dir
(#1581)
* Use spark local dirs in comet
* Add unit test
---
native/core/src/execution/jni_api.rs | 22 ++++++++++--
.../scala/org/apache/comet/CometExecIterator.scala | 2 ++
spark/src/main/scala/org/apache/comet/Native.scala | 1 +
.../scala/org/apache/comet/CometNativeSuite.scala | 40 ++++++++++++++++++++--
4 files changed, 60 insertions(+), 5 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index dd4d031b9..0779bae9a 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -38,6 +38,7 @@ use jni::{
sys::{jbyteArray, jint, jlong, jlongArray},
JNIEnv,
};
+use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{collections::HashMap, sync::Arc, task::Poll};
@@ -167,6 +168,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
metrics_node: JObject,
metrics_update_interval: jlong,
comet_task_memory_manager_obj: JObject,
+ local_dirs: jobjectArray,
batch_size: jint,
off_heap_mode: jboolean,
memory_pool_type: jstring,
@@ -208,6 +210,8 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let input_source = Arc::new(jni_new_global_ref!(env,
input_source)?);
input_sources.push(input_source);
}
+
+ // Create DataFusion memory pool
let task_memory_manager =
Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?);
@@ -221,10 +225,21 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager,
task_attempt_id);
+ // Get local directories for storing spill files
+ let local_dirs_array = JObjectArray::from_raw(local_dirs);
+ let num_local_dirs = env.get_array_length(&local_dirs_array)?;
+ let mut local_dirs = vec![];
+ for i in 0..num_local_dirs {
+ let local_dir: JString =
env.get_object_array_element(&local_dirs_array, i)?.into();
+ let local_dir = env.get_string(&local_dir)?;
+ local_dirs.push(local_dir.into());
+ }
+
// We need to keep the session context alive. Some session state like
temporary
// dictionaries are stored in session context. If it is dropped, the
temporary
// dictionaries will be dropped as well.
- let session = prepare_datafusion_session_context(batch_size as usize,
memory_pool)?;
+ let session =
+ prepare_datafusion_session_context(batch_size as usize,
memory_pool, local_dirs)?;
let plan_creation_time = start.elapsed();
@@ -262,8 +277,11 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
fn prepare_datafusion_session_context(
batch_size: usize,
memory_pool: Arc<dyn MemoryPool>,
+ local_dirs: Vec<String>,
) -> CometResult<SessionContext> {
- let mut rt_config =
RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
+ let disk_manager_config =
+
DiskManagerConfig::NewSpecified(local_dirs.into_iter().map(PathBuf::from).collect());
+ let mut rt_config =
RuntimeEnvBuilder::new().with_disk_manager(disk_manager_config);
rt_config = rt_config.with_memory_pool(memory_pool);
// Get Datafusion configuration from Spark Execution context
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index b9bb3b2b9..f409c79e3 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -64,6 +64,7 @@ class CometExecIterator(
}.toArray
private val plan = {
val conf = SparkEnv.get.conf
+ val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs
val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
val memoryLimit = if (offHeapMode) {
@@ -83,6 +84,7 @@ class CometExecIterator(
nativeMetrics,
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
new CometTaskMemoryManager(id),
+ localDiskDirs,
batchSize = COMET_BATCH_SIZE.get(),
offHeapMode,
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala
b/spark/src/main/scala/org/apache/comet/Native.scala
index e466b2f4d..3a88622da 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -58,6 +58,7 @@ class Native extends NativeBase {
metrics: CometMetricNode,
metricsUpdateInterval: Long,
taskMemoryManager: CometTaskMemoryManager,
+ localDirs: Array[String],
batchSize: Int,
offHeapMode: Boolean,
memoryPoolType: String,
diff --git a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
index 2ff38eae8..325ef51f6 100644
--- a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
@@ -19,7 +19,7 @@
package org.apache.comet
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.expressions.PrettyAttribute
import org.apache.spark.sql.comet.{CometExec, CometExecUtils}
@@ -40,8 +40,11 @@ class CometNativeSuite extends CometTestBase {
limitOp,
1,
0)
- cometIter.next()
- cometIter.close()
+ try {
+ cometIter.next()
+ } finally {
+ cometIter.close()
+ }
value
}
@@ -63,4 +66,35 @@ class CometNativeSuite extends CometTestBase {
}
assert(exception2.getMessage contains "null context handle")
}
+
+ test("Comet native should use spark local dir as temp dir") {
+ withParquetTable((0 until 100000).map(i => (i, i + 1)), "table") {
+ val dirs = SparkEnv.get.blockManager.getLocalDiskDirs
+ dirs.foreach { dir =>
+ val files = new java.io.File(dir).listFiles()
+ assert(!files.exists(f => f.isDirectory &&
f.getName.startsWith("datafusion-")))
+ }
+
+ // Check if the DataFusion temporary dir exists in the Spark local dirs
when a spark job involving
+ // Comet native operator is running.
+ val observedDataFusionDir = spark
+ .table("table")
+ .selectExpr("_1 + _2 as value")
+ .rdd
+ .mapPartitions { _ =>
+ dirs.map { dir =>
+ val files = new java.io.File(dir).listFiles()
+ files.count(f => f.isDirectory &&
f.getName.startsWith("datafusion-"))
+ }.iterator
+ }
+ .sum()
+ assert(observedDataFusionDir > 0)
+
+ // DataFusion temporary dir should be cleaned up after the job is done.
+ dirs.foreach { dir =>
+ val files = new java.io.File(dir).listFiles()
+ assert(!files.exists(f => f.isDirectory &&
f.getName.startsWith("datafusion-")))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]