This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2c832b4a feat: Require offHeap memory to be enabled (always use
unified memory) (#1062)
2c832b4a is described below
commit 2c832b4a56eafa3dacbe3ef31d99adabccb803bf
Author: Andy Grove <[email protected]>
AuthorDate: Thu Nov 14 12:55:57 2024 -0700
feat: Require offHeap memory to be enabled (always use unified memory)
(#1062)
* Require offHeap memory
* remove unused import
* use off heap memory in stability tests
* reorder imports
---
docs/source/user-guide/tuning.md | 32 ++--------------------
native/core/src/execution/jni_api.rs | 24 ++--------------
.../scala/org/apache/comet/CometExecIterator.scala | 11 +-------
.../apache/comet/CometSparkSessionExtensions.scala | 7 +++++
.../spark/sql/comet/CometPlanStabilitySuite.scala | 3 ++
5 files changed, 16 insertions(+), 61 deletions(-)
diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md
index 30ada4c9..b1838ca8 100644
--- a/docs/source/user-guide/tuning.md
+++ b/docs/source/user-guide/tuning.md
@@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best
performance from you
## Memory Tuning
-Comet provides two options for memory management:
-
-- **Unified Memory Management** shares an off-heap memory pool between Spark
and Comet. This is the recommended option.
-- **Native Memory Management** leverages DataFusion's memory management for
the native plans and allocates memory independently of Spark.
-
-### Unified Memory Management
-
-This option is automatically enabled when `spark.memory.offHeap.enabled=true`.
+Comet shares an off-heap memory pool between Spark and Comet. This requires
setting `spark.memory.offHeap.enabled=true`.
+If this setting is not enabled, Comet will not accelerate queries and will
fall back to Spark.
Each executor will have a single memory pool which will be shared by all
native plans being executed within that
process, and by Spark itself. The size of the pool is specified by
`spark.memory.offHeap.size`.
-### Native Memory Management
-
-This option is automatically enabled when `spark.memory.offHeap.enabled=false`.
-
-Each native plan has a dedicated memory pool.
-
-By default, the size of each pool is `spark.comet.memory.overhead.factor *
spark.executor.memory`. The default value
-for `spark.comet.memory.overhead.factor` is `0.2`.
-
-It is important to take executor concurrency into account. The maximum number
of concurrent plans in an executor can
-be calculated with `spark.executor.cores / spark.task.cpus`.
-
-For example, if the executor can execute 4 plans concurrently, then the total
amount of memory allocated will be
-`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.
-
-It is also possible to set `spark.comet.memoryOverhead` to the desired size
for each pool, rather than calculating
-it based on `spark.comet.memory.overhead.factor`.
-
-If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor`
are set, the former will be used.
-
-Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.
-
### Determining How Much Memory to Allocate
Generally, increasing memory overhead will improve query performance,
especially for queries containing joins and
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index d7e8ccab..47d87fe1 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -202,27 +202,9 @@ fn prepare_datafusion_session_context(
let mut rt_config =
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
- // Check if we are using unified memory manager integrated with Spark.
Default to false if not
- // set.
- let use_unified_memory_manager = parse_bool(conf,
"use_unified_memory_manager")?;
-
- if use_unified_memory_manager {
- // Set Comet memory pool for native
- let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
- rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
- } else {
- // Use the memory pool from DF
- if conf.contains_key("memory_limit") {
- let memory_limit =
conf.get("memory_limit").unwrap().parse::<usize>()?;
- let memory_fraction = conf
- .get("memory_fraction")
- .ok_or(CometError::Internal(
- "Config 'memory_fraction' is not specified from Comet JVM
side".to_string(),
- ))?
- .parse::<f64>()?;
- rt_config = rt_config.with_memory_limit(memory_limit,
memory_fraction)
- }
- }
+ // Set Comet memory pool for native
+ let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+ rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 07dd80c3..b1f22726 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._
-import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS,
COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_WORKER_THREADS}
+import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS,
COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil
/**
@@ -75,15 +75,6 @@ class CometExecIterator(
val result = new java.util.HashMap[String, String]()
val conf = SparkEnv.get.conf
- val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
- // Only enable unified memory manager when off-heap mode is enabled.
Otherwise,
- // we'll use the built-in memory pool from DF, and initializes with
`memory_limit`
- // and `memory_fraction` below.
- result.put(
- "use_unified_memory_manager",
- String.valueOf(conf.get("spark.memory.offHeap.enabled", "false")))
- result.put("memory_limit", String.valueOf(maxMemory))
- result.put("memory_fraction",
String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("explain_native",
String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 6a5c0efe..1c4ffcf3 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -915,6 +915,13 @@ class CometSparkSessionExtensions
}
override def apply(plan: SparkPlan): SparkPlan = {
+
+ // Comet required off-heap memory to be enabled
+ if ("true" != conf.getConfString("spark.memory.offHeap.enabled",
"false")) {
+ logInfo("Comet extension disabled because
spark.memory.offHeap.enabled=false")
+ return plan
+ }
+
// DataFusion doesn't have ANSI mode. For now we just disable CometExec
if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index a553e61c..16a7e533 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED,
MEMORY_OFFHEAP_SIZE}
import org.apache.spark.sql.TPCDSBase
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.util.resourceToString
@@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
conf.set(
"spark.shuffle.manager",
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
+ conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]