This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 083ecfbbe9 [VL] Move pre-configuration code of dynamic off-heap sizing
to its own place (#9336)
083ecfbbe9 is described below
commit 083ecfbbe9cbb8d2dffd86c177ac5e1debdac973
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 17 17:10:25 2025 +0100
[VL] Move pre-configuration code of dynamic off-heap sizing to its own
place (#9336)
---
.../DynamicOffHeapSizingMemoryTarget.java | 77 +++++++++++++++-------
.../gluten/memory/memtarget/MemoryTargets.java | 2 +-
.../scala/org/apache/gluten/GlutenPlugin.scala | 75 ++++-----------------
.../org/apache/spark/memory/SparkMemoryUtil.scala | 4 +-
.../org/apache/gluten/config/GlutenConfig.scala | 9 ++-
5 files changed, 75 insertions(+), 92 deletions(-)
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index 4056df062e..6b981903f4 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -17,6 +17,8 @@
package org.apache.gluten.memory.memtarget;
import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
+import org.apache.gluten.proto.MemoryUsageStats;
import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
@@ -24,19 +26,43 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * The memory target used by dynamic off-heap sizing. Since
+ * https://github.com/apache/incubator-gluten/issues/5439.
+ */
@Experimental
-public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget {
+public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget,
KnownNameAndStats {
+
private static final Logger LOG =
LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class);
- private final MemoryTarget delegated;
// When dynamic off-heap sizing is enabled, the off-heap should be sized for
the total usable
// memory, so we can use it as the max memory we will use.
- private static final long MAX_MEMORY_IN_BYTES =
GlutenConfig.get().offHeapMemorySize();
- private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong();
-
- public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) {
- this.delegated = delegated;
+ private static final long TOTAL_MEMORY_SHARED;
+
+ static {
+ final long maxOnHeapSize = Runtime.getRuntime().maxMemory();
+ final double fractionForSizing =
GlutenConfig.get().dynamicOffHeapSizingMemoryFraction();
+ // Since when dynamic off-heap sizing is enabled, we commingle on-heap
+ // and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
+ // size it with a memory fraction, which can be aggressively set, but the
default
+ // is using the same way that Spark sizes on-heap memory:
+ //
+ // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
+ // (spark.executor.memory - 300MB).
+ //
+ // We will be careful to use the same configuration settings as Spark to
ensure
+ // that we are sizing the off-heap memory in the same way as Spark sizes
on-heap memory.
+ // The 300MB value, unfortunately, is hard-coded in Spark code.
+ TOTAL_MEMORY_SHARED = (long) ((maxOnHeapSize - (300 * 1024 * 1024)) *
fractionForSizing);
+ LOG.info("DynamicOffHeapSizingMemoryTarget MAX_MEMORY_IN_BYTES: {}",
TOTAL_MEMORY_SHARED);
}
+ private static final AtomicLong USED_OFF_HEAP_BYTES = new AtomicLong();
+
+ private final String name =
MemoryTargetUtil.toUniqueName("DynamicOffHeapSizing");
+ private final SimpleMemoryUsageRecorder recorder = new
SimpleMemoryUsageRecorder();
+
+ public DynamicOffHeapSizingMemoryTarget() {}
+
@Override
public long borrow(long size) {
if (size == 0) {
@@ -47,47 +73,42 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget {
// See https://github.com/apache/incubator-gluten/issues/9276.
long totalHeapMemory = Runtime.getRuntime().totalMemory();
long freeHeapMemory = Runtime.getRuntime().freeMemory();
-
- long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get();
+ long usedOffHeapMemory = USED_OFF_HEAP_BYTES.get();
// Adds the total JVM memory which is the actual memory the JVM occupied
from the operating
// system into the counter.
- if (size + usedOffHeapBytesNow + totalHeapMemory > MAX_MEMORY_IN_BYTES) {
+ if (size + usedOffHeapMemory + totalHeapMemory > TOTAL_MEMORY_SHARED) {
LOG.warn(
String.format(
"Failing allocation as unified memory is OOM. "
+ "Used Off-heap: %d, Used On-Heap: %d, "
+ "Free On-heap: %d, Total On-heap: %d, "
+ "Max On-heap: %d, Allocation: %d.",
- usedOffHeapBytesNow,
+ usedOffHeapMemory,
totalHeapMemory - freeHeapMemory,
freeHeapMemory,
totalHeapMemory,
- MAX_MEMORY_IN_BYTES,
+ TOTAL_MEMORY_SHARED,
size));
return 0;
}
- long reserved = delegated.borrow(size);
-
- USED_OFFHEAP_BYTES.addAndGet(reserved);
-
- return reserved;
+ USED_OFF_HEAP_BYTES.addAndGet(size);
+ recorder.inc(size);
+ return size;
}
@Override
public long repay(long size) {
- long unreserved = delegated.repay(size);
-
- USED_OFFHEAP_BYTES.addAndGet(-unreserved);
-
- return unreserved;
+ USED_OFF_HEAP_BYTES.addAndGet(-size);
+ recorder.inc(-size);
+ return size;
}
@Override
public long usedBytes() {
- return delegated.usedBytes();
+ return recorder.current();
}
@Override
@@ -95,7 +116,13 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget {
return visitor.visit(this);
}
- public MemoryTarget delegated() {
- return delegated;
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public MemoryUsageStats stats() {
+ return recorder.toStats();
}
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 9630af8acc..ff2a4da03e 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -51,7 +51,7 @@ public final class MemoryTargets {
@Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget
memoryTarget) {
if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) {
- return new DynamicOffHeapSizingMemoryTarget(memoryTarget);
+ return new DynamicOffHeapSizingMemoryTarget();
}
return memoryTarget;
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 8a539d44e4..ff5ae59eef 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -188,74 +188,25 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// check memory off-heap enabled and size.
checkOffHeapSettings(conf)
- // Task slots.
- val taskSlots = SparkResourceUtil.getTaskSlots(conf)
- conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
-
- val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 *
1024 * 1024)
-
- // If dynamic off-heap sizing is enabled, the off-heap size is calculated
based on the on-heap
- // size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
- // Note that this means that we will IGNORE the off-heap size specified by
the user if the
- // dynamic off-heap feature is enabled.
- val offHeapSize: Long =
- if (
- conf.getBoolean(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
- ) {
- // Since when dynamic off-heap sizing is enabled, we commingle on-heap
- // and off-heap memory, we set the off-heap size to the usable on-heap
size. We will
- // size it with a memory fraction, which can be aggressively set, but
the default
- // is using the same way that Spark sizes on-heap memory:
- //
- // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
- // (spark.executor.memory - 300MB).
- //
- // We will be careful to use the same configuration settings as Spark
to ensure
- // that we are sizing the off-heap memory in the same way as Spark
sizes on-heap memory.
- // The 300MB value, unfortunately, is hard-coded in Spark code.
- ((onHeapSize - (300 * 1024 * 1024)) *
- conf.getDouble(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION.key,
0.6d)).toLong
- } else {
- // Optimistic off-heap sizes, assuming all storage memory can be
borrowed into execution
- // memory pool, regardless of Spark option
spark.memory.storageFraction.
- conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
- }
+ // Get the off-heap size set by user.
+ val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
+ // Set off-heap size in bytes.
conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString)
- conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+ // Set off-heap size in bytes per task.
+ val taskSlots = SparkResourceUtil.getTaskSlots(conf)
+ conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
val offHeapPerTask = offHeapSize / taskSlots
conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, offHeapPerTask.toString)
- // If we are using dynamic off-heap sizing, we should also enable off-heap
memory
- // officially.
- if (
- conf.getBoolean(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
- ) {
- conf.set(SPARK_OFFHEAP_ENABLED, "true")
-
- // We already sized the off-heap per task in a conservative manner, so
we can just
- // use it.
- conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
offHeapPerTask.toString)
- } else {
- // Let's make sure this is set to false explicitly if it is not on as it
- // is looked up when throwing OOF exceptions.
- conf.set(
- DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
- DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValueString)
-
- // Pessimistic off-heap sizes, with the assumption that all
non-borrowable storage memory
- // determined by spark.memory.storageFraction was used.
- val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction",
0.5d)
- val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
- conf.set(
- COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
- conservativeOffHeapPerTask.toString)
- }
+ // Pessimistic off-heap sizes, with the assumption that all non-borrowable
storage memory
+ // determined by spark.memory.storageFraction was used.
+ val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
+ val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong /
taskSlots
+ conf.set(
+ COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+ conservativeOffHeapPerTask.toString)
// Disable vanilla columnar readers, to prevent columnar-to-columnar
conversions.
// FIXME: Do we still need this trick since
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index b0322fecd4..ae287cef3f 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -129,7 +129,9 @@ object SparkMemoryUtil {
override def visit(
dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget):
String = {
- dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
+ prettyPrintStats(
+ "Dynamic off-heap sizing memory target stats: ",
+ dynamicOffHeapSizingMemoryTarget)
}
override def visit(retryOnOomMemoryTarget: RetryOnOomMemoryTarget):
String = {
diff --git
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index f19ef0833a..449db38e24 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -342,6 +342,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def dynamicOffHeapSizingEnabled: Boolean =
getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+ def dynamicOffHeapSizingMemoryFraction: Double =
+ getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
+
def enableHiveFileFormatWriter: Boolean =
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
@@ -1244,7 +1247,7 @@ object GlutenConfig {
.createWithDefault(false)
val COLUMNAR_MEMORY_UNTRACKED =
- buildConf("spark.gluten.memory.untracked")
+ buildStaticConf("spark.gluten.memory.untracked")
.internal()
.doc(
"When enabled, turn all native memory allocations in Gluten into
untracked. Spark " +
@@ -1634,7 +1637,7 @@ object GlutenConfig {
.createWithDefault(true)
val DYNAMIC_OFFHEAP_SIZING_ENABLED =
- buildConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
+ buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
.internal()
.doc(
"Experimental: When set to true, the offheap config
(spark.memory.offHeap.size) will " +
@@ -1651,7 +1654,7 @@ object GlutenConfig {
.createWithDefault(false)
val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
- buildConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
+
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
.internal()
.doc(
"Experimental: Determines the memory fraction used to determine the
total " +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]