This is an automated email from the ASF dual-hosted git repository.
marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0b1e5115e5 [VL] Reduce Velox hash shuffle partition buffer memory by
evicting large partitions after split (#12089)
0b1e5115e5 is described below
commit 0b1e5115e5a2f9a05ad9f898f42b81eb42cb87ee
Author: wankun <[email protected]>
AuthorDate: Tue May 19 16:57:41 2026 +0800
[VL] Reduce Velox hash shuffle partition buffer memory by evicting large
partitions after split (#12089)
---
.../VeloxCelebornColumnarShuffleWriter.scala | 1 +
.../writer/VeloxUniffleColumnarShuffleWriter.java | 1 +
.../spark/shuffle/ColumnarShuffleWriter.scala | 1 +
cpp/core/jni/JniWrapper.cc | 4 ++-
cpp/core/shuffle/Options.h | 14 +++++++---
cpp/core/shuffle/Payload.cc | 2 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 32 ++++++++++++++++++++++
cpp/velox/shuffle/VeloxHashShuffleWriter.h | 6 +++-
docs/Configuration.md | 1 +
.../gluten/vectorized/ShuffleWriterJniWrapper.java | 1 +
.../org/apache/gluten/config/GlutenConfig.scala | 11 ++++++++
11 files changed, 67 insertions(+), 7 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 783d68a00c..03208adbcf 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning,
context.partitionId),
nativeBufferSize,
GlutenConfig.get.columnarShuffleReallocThreshold,
+ GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
partitionWriterHandle
)
case SortShuffleWriterType =>
diff --git
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index ef35818c7b..e01f97ba3e 100644
---
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -185,6 +185,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
columnarDep.nativePartitioning(), partitionId),
nativeBufferSize,
reallocThreshold,
+
GlutenConfig.get().columnarShufflePartitionBufferEvictThreshold(),
partitionWriterHandle);
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 01f4bd06ba..ff1c66e037 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V](
taskContext.partitionId),
nativeBufferSize,
reallocThreshold,
+ GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
partitionWriterHandle
)
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f109865840..46b9d7603c 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jint startPartitionId,
jint splitBufferSize,
jdouble splitBufferReallocThreshold,
+ jint partitionBufferEvictThreshold,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
@@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
splitBufferSize,
- splitBufferReallocThreshold);
+ splitBufferReallocThreshold,
+ partitionBufferEvictThreshold);
return ctx->saveObject(ctx->createShuffleWriter(numPartitions,
partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index ea3aff10cf..649a164774 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -27,6 +27,7 @@
namespace gluten {
static constexpr int16_t kDefaultBatchSize = 4096;
+static constexpr int32_t kDefaultPartitionBufferEvictThreshold = -1;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
@@ -85,6 +86,7 @@ struct ShuffleWriterOptions {
struct HashShuffleWriterOptions : ShuffleWriterOptions {
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
+ int32_t partitionBufferEvictThreshold =
kDefaultPartitionBufferEvictThreshold;
HashShuffleWriterOptions() :
ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {}
@@ -92,10 +94,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
- double partitionBufferReallocThreshold)
+ double partitionBufferReallocThreshold,
+ int32_t partitionBufferEvictThreshold =
kDefaultPartitionBufferEvictThreshold)
: ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning,
startPartitionId),
splitBufferSize(partitionBufferSize),
- splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
+ splitBufferReallocThreshold(partitionBufferReallocThreshold),
+ partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
protected:
HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) :
ShuffleWriterOptions(shuffleWriterType) {}
@@ -105,10 +109,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
- double partitionBufferReallocThreshold)
+ double partitionBufferReallocThreshold,
+ int32_t partitionBufferEvictThreshold =
kDefaultPartitionBufferEvictThreshold)
: ShuffleWriterOptions(shuffleWriterType, partitioning,
startPartitionId),
splitBufferSize(partitionBufferSize),
- splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
+ splitBufferReallocThreshold(partitionBufferReallocThreshold),
+ partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
};
struct SortShuffleWriterOptions : ShuffleWriterOptions {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 230806cd2f..5d02a84483 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -60,7 +60,7 @@ arrow::Result<uint8_t>
readPayloadType(arrow::io::InputStream* is) {
}
arrow::Result<int64_t> compressBuffer(
- const std::shared_ptr<arrow::Buffer>& buffer,
+ const std::shared_ptr<arrow::Buffer> buffer,
uint8_t* output,
int64_t outputLength,
arrow::util::Codec* codec) {
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e071e8a1c3..89f1dbcda6 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -441,9 +441,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const
facebook::velox::RowVector&
printPartitionBuffer();
setSplitState(SplitState::kInit);
+ if (partitionBufferEvictThreshold_ > 0) {
+ // After split, evict large partition buffers to free up memory for the
next input RowVector.
+ const auto partitionBytes = estimatePartitionBufferBytes();
+ for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) {
+ if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >=
partitionBufferEvictThreshold_) {
+ RETURN_NOT_OK(evictPartitionBuffers(pid, false));
+ }
+ }
+ }
return arrow::Status::OK();
}
+std::vector<int64_t> VeloxHashShuffleWriter::estimatePartitionBufferBytes()
const {
+ std::vector<int64_t> partitionBytes(numPartitions_, 0);
+
+ for (const auto& columnBuffers : partitionBuffers_) {
+ for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) {
+ for (const auto& buffer : columnBuffers[pid]) {
+ if (buffer) {
+ partitionBytes[pid] += buffer->capacity();
+ }
+ }
+ }
+ }
+
+ for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) {
+ const auto& buffer = complexTypeFlushBuffer_[pid];
+ if (buffer) {
+ partitionBytes[pid] += buffer->capacity();
+ }
+ }
+
+ return partitionBytes;
+}
+
arrow::Status VeloxHashShuffleWriter::splitRowVector(const
facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 899c2be269..d2901019b7 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -278,7 +278,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
MemoryManager* memoryManager)
: VeloxShuffleWriter(numPartitions, partitionWriter, options,
memoryManager),
splitBufferSize_(options->splitBufferSize),
- splitBufferReallocThreshold_(options->splitBufferReallocThreshold) {
+ splitBufferReallocThreshold_(options->splitBufferReallocThreshold),
+ partitionBufferEvictThreshold_(options->partitionBufferEvictThreshold)
{
arenas_.resize(numPartitions);
}
@@ -287,6 +288,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
+ std::vector<int64_t> estimatePartitionBufferBytes() const;
+
arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
@@ -396,6 +399,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
protected:
int32_t splitBufferSize_;
double splitBufferReallocThreshold_;
+ int32_t partitionBufferEvictThreshold_;
std::shared_ptr<arrow::Schema> schema_;
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 9a7cc8af8c..1bd4dce5b4 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -95,6 +95,7 @@ nav_order: 15
| spark.gluten.sql.columnar.shuffle.compression.threshold | 100
| If number of rows in a batch falls below this threshold, will copy
all buffers into one buffer to compress.
[...]
| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false
| Enable dictionary in hash-based shuffle.
[...]
| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25
|
+| spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold | -1
| For Velox hash shuffle writer, evict partition buffers larger than
this threshold after splitting an input batch. Use non-positive value to
disable this feature.
[...]
| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB
| Buffer size in bytes for shuffle reader reading input stream from
local or remote.
[...]
| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25
|
| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000
| The threshold to determine whether to use sort-based columnar
shuffle. Sort-based shuffle will be used if the number of columns is greater
than this threshold.
[...]
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index a389da6860..87685f8505 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -43,6 +43,7 @@ public class ShuffleWriterJniWrapper implements RuntimeAware {
int startPartitionId,
int splitBufferSize,
double splitBufferReallocThreshold,
+ int partitionBufferEvictThreshold,
long partitionWriterHandle);
public native long createSortShuffleWriter(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 0bbbcead63..f7ac297dec 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -225,6 +225,9 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def columnarShuffleReallocThreshold: Double =
getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD)
+ def columnarShufflePartitionBufferEvictThreshold: Int =
+ getConf(COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD)
+
def columnarShuffleMergeThreshold: Double =
getConf(SHUFFLE_WRITER_MERGE_THRESHOLD)
def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC)
@@ -1074,6 +1077,14 @@ object GlutenConfig extends ConfigRegistry {
.checkValue(v => v >= 0 && v <= 1, "Buffer reallocation threshold must
between [0, 1]")
.createWithDefault(0.25)
+ val COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD =
+
buildConf("spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold")
+ .doc(
+ "For Velox hash shuffle writer, evict partition buffers larger than
this threshold " +
+ "after splitting an input batch. Use non-positive value to disable
this feature.")
+ .intConf
+ .createWithDefault(-1)
+
val COLUMNAR_SHUFFLE_CODEC =
buildConf("spark.gluten.sql.columnar.shuffle.codec")
.doc(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]