This is an automated email from the ASF dual-hosted git repository.
marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ad03c2d991 [VL] Restore hash shuffle reader payload merging (#12097)
ad03c2d991 is described below
commit ad03c2d9912ee3dbbe51f93817a8f87f71cfa458
Author: Zhen Li <[email protected]>
AuthorDate: Tue May 19 03:22:15 2026 +0800
[VL] Restore hash shuffle reader payload merging (#12097)
---
.../VeloxCelebornColumnarBatchSerializer.scala | 6 +-
.../org/apache/gluten/config/VeloxConfig.scala | 17 ++
.../vectorized/ColumnarBatchSerializer.scala | 7 +-
cpp/core/jni/JniWrapper.cc | 4 +-
cpp/core/shuffle/Options.h | 4 +
cpp/velox/compute/VeloxRuntime.cc | 3 +-
cpp/velox/shuffle/VeloxShuffleReader.cc | 173 +++++++++--
cpp/velox/shuffle/VeloxShuffleReader.h | 16 +-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 322 +++++++++++++++++++++
docs/velox-configuration.md | 143 ++++-----
.../gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +-
11 files changed, 596 insertions(+), 102 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index f377ea99f6..2c36c773f4 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil
@@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance(
val batchSize = GlutenConfig.get.maxBatchSize
val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val deserializerBufferSize =
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+ val enableHashShuffleReaderStreamMerge =
VeloxConfig.get.enableHashShuffleReaderStreamMerge
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
@@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance(
batchSize,
readerBufferSize,
deserializerBufferSize,
- shuffleWriterType.name
+ shuffleWriterType.name,
+ enableHashShuffleReaderStreamMerge
)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index dbc833f046..9ea203d42b 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxResizeBatchesShuffleOutput: Boolean =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)
+ def enableHashShuffleReaderStreamMerge: Boolean =
+ getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED)
+
case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
assert(min > 0, "Min batch size should be larger than 0")
@@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)
+ val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED =
+
buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled")
+ .doc(
+ "Enables a reader-side raw payload merge fast path for plain hash
shuffle payloads " +
+ "within each shuffle input stream. This path merges payload buffers
before Velox " +
+ "vectors are materialized, so it has lower per-batch overhead than
generic " +
+ "VeloxResizeBatchesExec resizing, but it only covers plain payloads.
Complex types " +
+ "and dictionary-encoded payloads are not merged by this path. " +
+ "VeloxResizeBatchesExec can still be enabled separately as a generic
complement " +
+ "for types and encodings not covered by this fast path. If false,
each hash " +
+ "shuffle payload is returned as its own columnar batch.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.doc(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 3b5fce63f8..284d931ecc 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.vectorized
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
@@ -104,6 +104,7 @@ private class ColumnarBatchSerializerInstanceImpl(
val batchSize = GlutenConfig.get.maxBatchSize
val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val deserializerBufferSize =
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+ val enableHashShuffleReaderStreamMerge =
VeloxConfig.get.enableHashShuffleReaderStreamMerge
val shuffleReaderHandle = jniWrapper.make(
cSchema.memoryAddress(),
compressionCodec,
@@ -111,7 +112,9 @@ private class ColumnarBatchSerializerInstanceImpl(
batchSize,
readerBufferSize,
deserializerBufferSize,
- shuffleWriterType.name)
+ shuffleWriterType.name,
+ enableHashShuffleReaderStreamMerge
+ )
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
// was used to create all buffers read from shuffle reader. The pool
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4a3215ae66..f109865840 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jint batchSize,
jlong readerBufferSize,
jlong deserializerBufferSize,
- jstring shuffleWriterType) {
+ jstring shuffleWriterType,
+ jboolean enableHashShuffleReaderStreamMerge) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
@@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
options.batchSize = batchSize;
options.readerBufferSize = readerBufferSize;
options.deserializerBufferSize = deserializerBufferSize;
+ options.enableHashShuffleReaderStreamMerge =
enableHashShuffleReaderStreamMerge;
options.shuffleWriterType =
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 1d7f9ad9f9..ea3aff10cf 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -63,6 +63,10 @@ struct ShuffleReaderOptions {
// Buffer size when deserializing rows into columnar batches. Only used for
sort-based shuffle.
int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
+
+ // Whether to enable the reader-side raw payload merge fast path for plain
hash shuffle payloads within one input
+ // stream.
+ bool enableHashShuffleReaderStreamMerge = false;
};
struct ShuffleWriterOptions {
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 1e2cc6f308..e3eac17a22 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -585,7 +585,8 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
options.readerBufferSize,
options.deserializerBufferSize,
memoryManager(),
- options.shuffleWriterType);
+ options.shuffleWriterType,
+ options.enableHashShuffleReaderStreamMerge);
return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
}
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index bb2010378e..a469d5c770 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -56,6 +56,11 @@ arrow::Result<BlockType>
readBlockType(arrow::io::InputStream* inputStream) {
return type;
}
+uint32_t validateHashShuffleReaderBatchSize(int32_t batchSize) {
+ GLUTEN_CHECK(batchSize > 0, fmt::format("Hash shuffle reader batch size must
be positive, but got {}", batchSize));
+ return static_cast<uint32_t>(batchSize);
+}
+
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
@@ -300,6 +305,23 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
+std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
+ RowTypePtr type,
+ std::unique_ptr<InMemoryPayload> payload,
+ memory::MemoryPool* pool,
+ int64_t& deserializeTime) {
+ ScopedTimer timer(&deserializeTime);
+ std::vector<BufferPtr> veloxBuffers;
+ auto numBuffers = payload->numBuffers();
+ veloxBuffers.reserve(numBuffers);
+ for (size_t i = 0; i < numBuffers; ++i) {
+ GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
+ veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
+ }
+ auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {},
pool);
+ return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
+}
+
arrow::Result<BufferPtr>
readDictionaryBuffer(arrow::io::InputStream* in,
facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
size_t bufferSize;
@@ -444,23 +466,45 @@
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
+ int32_t batchSize,
int64_t readerBufferSize,
VeloxMemoryManager* memoryManager,
+ std::vector<bool> isValidityBuffer,
+ bool hasComplexType,
+ bool enableStreamMerge,
int64_t& deserializeTime,
int64_t& decompressTime)
: streamReader_(streamReader),
schema_(schema),
codec_(codec),
rowType_(rowType),
+ batchSize_(validateHashShuffleReaderBatchSize(batchSize)),
readerBufferSize_(readerBufferSize),
memoryManager_(memoryManager),
+ isValidityBuffer_(std::move(isValidityBuffer)),
+ hasComplexType_(hasComplexType),
+ enableStreamMerge_(enableStreamMerge),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime) {}
+bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
+ // Stream merge is a reader-side raw payload fast path: for plain payloads it
+ // concatenates buffers before Velox vectors are materialized, avoiding the
generic
+ // RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and
dictionary
+ // payloads on the existing per-payload path; VeloxResizeBatchesExec can be
enabled
+ // separately as the generic complement for those cases.
+ return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty();
+}
+
bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
+ if (blockTypeResolved_) {
+ return true;
+ }
+
GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
switch (blockType) {
case BlockType::kEndOfStream:
+ in_ = nullptr;
return false;
case BlockType::kDictionary: {
VeloxDictionaryReader reader(rowType_,
memoryManager_->getLeafMemoryPool().get(), codec_.get());
@@ -485,6 +529,7 @@ bool
VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
default:
throw GlutenException(fmt::format("Unsupported block type: {}",
static_cast<int32_t>(blockType)));
}
+ blockTypeResolved_ = true;
return true;
}
@@ -499,6 +544,12 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() {
return;
}
+ if (!dictionaryFields_.empty() || !dictionaries_.empty()) {
+ dictionaryFields_.clear();
+ dictionaries_.clear();
+ }
+ blockTypeResolved_ = false;
+
if (readerBufferSize_ > 0) {
GLUTEN_ASSIGN_OR_THROW(
in_,
@@ -510,36 +561,106 @@ void
VeloxHashShuffleReaderDeserializer::loadNextStream() {
}
std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
- if (in_ == nullptr) {
- loadNextStream();
+ while (true) {
+ if (in_ == nullptr) {
+ if (merged_) {
+ return makeColumnarBatch(
+ rowType_, std::move(merged_),
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
+ }
- if (reachedEos_) {
- return nullptr;
+ loadNextStream();
+
+ if (reachedEos_) {
+ return nullptr;
+ }
+ }
+ if (resolveNextBlockType()) {
+ break;
}
}
- while (!resolveNextBlockType()) {
- loadNextStream();
-
- if (reachedEos_) {
- return nullptr;
+ if (shouldSkipMerge()) {
+ if (merged_) {
+ return makeColumnarBatch(
+ rowType_, std::move(merged_),
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
}
+
+ uint32_t numRows = 0;
+ GLUTEN_ASSIGN_OR_THROW(
+ auto arrowBuffers,
+ BlockPayload::deserialize(
+ in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(),
numRows, deserializeTime_, decompressTime_));
+
+ blockTypeResolved_ = false;
+
+ return makeColumnarBatch(
+ rowType_,
+ numRows,
+ std::move(arrowBuffers),
+ dictionaryFields_,
+ dictionaries_,
+ memoryManager_->getLeafMemoryPool().get(),
+ deserializeTime_);
}
+ std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
uint32_t numRows = 0;
- GLUTEN_ASSIGN_OR_THROW(
- auto arrowBuffers,
- BlockPayload::deserialize(
- in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(),
numRows, deserializeTime_, decompressTime_));
+ while (!merged_ || merged_->numRows() < batchSize_) {
+ if (in_ == nullptr) {
+ if (merged_) {
+ break;
+ }
+
+ loadNextStream();
+ if (reachedEos_) {
+ break;
+ }
+ }
+ if (!resolveNextBlockType()) {
+ continue;
+ }
+
+ if (shouldSkipMerge()) {
+ break;
+ }
+
+ GLUTEN_ASSIGN_OR_THROW(
+ arrowBuffers,
+ BlockPayload::deserialize(
+ in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(),
numRows, deserializeTime_, decompressTime_));
- return makeColumnarBatch(
- rowType_,
- numRows,
- std::move(arrowBuffers),
- dictionaryFields_,
- dictionaries_,
- memoryManager_->getLeafMemoryPool().get(),
- deserializeTime_);
+ blockTypeResolved_ = false;
+
+ if (!merged_) {
+ merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_,
schema_, std::move(arrowBuffers));
+ arrowBuffers.clear();
+ continue;
+ }
+
+ auto mergedRows = merged_->numRows() + numRows;
+ if (mergedRows > batchSize_) {
+ break;
+ }
+
+ auto append = std::make_unique<InMemoryPayload>(numRows,
&isValidityBuffer_, schema_, std::move(arrowBuffers));
+ GLUTEN_ASSIGN_OR_THROW(
+ merged_,
+ InMemoryPayload::merge(std::move(merged_), std::move(append),
memoryManager_->defaultArrowMemoryPool()));
+ arrowBuffers.clear();
+ }
+
+ if (!merged_) {
+ return nullptr;
+ }
+
+ auto columnarBatch =
+ makeColumnarBatch(rowType_, std::move(merged_),
memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
+
+ if (!arrowBuffers.empty()) {
+ merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_,
schema_, std::move(arrowBuffers));
+ }
+
+ return columnarBatch;
}
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
@@ -797,7 +918,8 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
- ShuffleWriterType shuffleWriterType)
+ ShuffleWriterType shuffleWriterType,
+ bool enableHashShuffleReaderStreamMerge)
: schema_(schema),
codec_(codec),
veloxCompressionType_(veloxCompressionType),
@@ -806,7 +928,8 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
readerBufferSize_(readerBufferSize),
deserializerBufferSize_(deserializerBufferSize),
memoryManager_(memoryManager),
- shuffleWriterType_(shuffleWriterType) {
+ shuffleWriterType_(shuffleWriterType),
+ enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) {
initFromSchema();
}
@@ -832,8 +955,12 @@ std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::cr
schema_,
codec_,
rowType_,
+ batchSize_,
readerBufferSize_,
memoryManager_,
+ isValidityBuffer_,
+ hasComplexType_,
+ enableHashShuffleReaderStreamMerge_,
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kSortShuffle:
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index f30595dde4..f92f0a2cc3 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -34,14 +34,20 @@ class VeloxHashShuffleReaderDeserializer final : public
ColumnarBatchIterator {
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
+ int32_t batchSize,
int64_t readerBufferSize,
VeloxMemoryManager* memoryManager,
+ std::vector<bool> isValidityBuffer,
+ bool hasComplexType,
+ bool enableStreamMerge,
int64_t& deserializeTime,
int64_t& decompressTime);
std::shared_ptr<ColumnarBatch> next() override;
private:
+ bool shouldSkipMerge() const;
+
bool resolveNextBlockType();
void loadNextStream();
@@ -50,15 +56,21 @@ class VeloxHashShuffleReaderDeserializer final : public
ColumnarBatchIterator {
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
+ uint32_t batchSize_;
int64_t readerBufferSize_;
VeloxMemoryManager* memoryManager_;
+ std::vector<bool> isValidityBuffer_;
+ bool hasComplexType_;
+ bool enableStreamMerge_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
std::shared_ptr<arrow::io::InputStream> in_{nullptr};
+ std::unique_ptr<InMemoryPayload> merged_{nullptr};
bool reachedEos_{false};
+ bool blockTypeResolved_{false};
std::vector<int32_t> dictionaryFields_{};
std::vector<facebook::velox::VectorPtr> dictionaries_{};
@@ -161,7 +173,8 @@ class VeloxShuffleReaderDeserializerFactory {
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
- ShuffleWriterType shuffleWriterType);
+ ShuffleWriterType shuffleWriterType,
+ bool enableHashShuffleReaderStreamMerge = false);
std::unique_ptr<ColumnarBatchIterator> createDeserializer(const
std::shared_ptr<StreamReader>& streamReader);
@@ -185,6 +198,7 @@ class VeloxShuffleReaderDeserializerFactory {
bool hasComplexType_{false};
ShuffleWriterType shuffleWriterType_;
+ bool enableHashShuffleReaderStreamMerge_;
int64_t deserializeTime_{0};
int64_t decompressTime_{0};
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index bfb783f4be..18046629d4 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,6 +18,10 @@
#include <arrow/c/bridge.h>
#include <arrow/io/api.h>
+#include <cstring>
+#include <optional>
+
+#include "shuffle/Payload.h"
#include "shuffle/VeloxHashShuffleWriter.h"
#include "shuffle/VeloxRssSortShuffleWriter.h"
#include "shuffle/VeloxSortShuffleWriter.h"
@@ -191,6 +195,23 @@ std::shared_ptr<PartitionWriter> createPartitionWriter(
}
}
+class MultiStreamReader : public StreamReader {
+ public:
+ explicit
MultiStreamReader(std::vector<std::shared_ptr<arrow::io::InputStream>> streams)
+ : streams_(std::move(streams)) {}
+
+ std::shared_ptr<arrow::io::InputStream> readNextStream(arrow::MemoryPool*)
override {
+ if (index_ >= streams_.size()) {
+ return nullptr;
+ }
+ return std::move(streams_[index_++]);
+ }
+
+ private:
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams_;
+ size_t index_{0};
+};
+
} // namespace
class VeloxShuffleWriterTestEnvironment : public ::testing::Environment {
@@ -441,6 +462,307 @@ class RoundRobinPartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
}
};
+class VeloxShuffleReaderStreamMergeTest : public ::testing::Test, public
VeloxShuffleWriterTestBase {
+ protected:
+ void SetUp() override {
+ VeloxShuffleWriterTestBase::setUpTestData();
+ }
+
+ std::shared_ptr<arrow::io::InputStream> writeSinglePartitionStream(
+ const RowVectorPtr& vector,
+ bool enableDictionary = false) {
+ return writeSinglePartitionStream(std::vector<RowVectorPtr>{vector},
enableDictionary);
+ }
+
+ std::shared_ptr<arrow::io::InputStream> writeSinglePartitionStream(
+ const std::vector<RowVectorPtr>& vectors,
+ bool enableDictionary = false) {
+ GLUTEN_ASSIGN_OR_THROW(auto dataFile,
createTempShuffleFile(localDirs_[0]));
+
+ auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
+ shuffleWriterOptions->partitioning = Partitioning::kSingle;
+ shuffleWriterOptions->splitBufferSize = 1024;
+
+ auto partitionWriter = createPartitionWriter(
+ PartitionWriterType::kLocal, 1, dataFile, localDirs_,
arrow::Compression::UNCOMPRESSED, 0, 0, enableDictionary);
+ GLUTEN_ASSIGN_OR_THROW(
+ auto shuffleWriter,
+ VeloxShuffleWriter::create(
+ ShuffleWriterType::kHashShuffle, 1, partitionWriter,
shuffleWriterOptions, getDefaultMemoryManager()));
+
+ for (const auto& vector : vectors) {
+ GLUTEN_THROW_NOT_OK(
+ shuffleWriter->write(std::make_shared<VeloxColumnarBatch>(vector),
ShuffleWriter::kMinMemLimit));
+ }
+ GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
+
+ const auto& lengths = shuffleWriter->partitionLengths();
+ VELOX_CHECK_EQ(lengths.size(), 1);
+
+ std::shared_ptr<arrow::io::ReadableFile> file;
+ GLUTEN_ASSIGN_OR_THROW(file, arrow::io::ReadableFile::Open(dataFile));
+ readableFiles_.push_back(file);
+
+ GLUTEN_ASSIGN_OR_THROW(auto in,
arrow::io::RandomAccessFile::GetStream(file, 0, lengths[0]));
+ return in;
+ }
+
+ std::shared_ptr<arrow::io::InputStream>
makeDictionaryPayloadOnlyStream(uint32_t numRows) {
+ GLUTEN_ASSIGN_OR_THROW(
+ auto indices, arrow::AllocateResizableBuffer(sizeof(int32_t) *
numRows, arrow::default_memory_pool()));
+ std::vector<int32_t> rawIndices(numRows, 0);
+ std::memcpy(indices->mutable_data(), rawIndices.data(), sizeof(int32_t) *
numRows);
+ std::shared_ptr<arrow::Buffer> indicesBuffer = std::move(indices);
+
+ std::vector<std::shared_ptr<arrow::Buffer>>
buffers{std::shared_ptr<arrow::Buffer>{}, indicesBuffer};
+ static const std::vector<bool> kStringDictionaryPayloadValidity = {true,
false};
+ GLUTEN_ASSIGN_OR_THROW(
+ auto payload,
+ BlockPayload::fromBuffers(
+ Payload::kUncompressed,
+ numRows,
+ std::move(buffers),
+ &kStringDictionaryPayloadValidity,
+ arrow::default_memory_pool(),
+ nullptr));
+
+ GLUTEN_ASSIGN_OR_THROW(auto os,
arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool()));
+ static constexpr uint8_t kDictionaryPayload =
static_cast<uint8_t>(BlockType::kDictionaryPayload);
+ GLUTEN_THROW_NOT_OK(os->Write(&kDictionaryPayload,
sizeof(kDictionaryPayload)));
+ GLUTEN_THROW_NOT_OK(payload->serialize(os.get()));
+
+ GLUTEN_ASSIGN_OR_THROW(auto buffer, os->Finish());
+ return std::make_shared<arrow::io::BufferReader>(buffer);
+ }
+
+ std::vector<RowVectorPtr> readStreams(
+ const RowTypePtr& rowType,
+ int32_t batchSize,
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams,
+ std::optional<bool> enableStreamMerge = std::nullopt) {
+ const auto schema = toArrowSchema(rowType,
getDefaultMemoryManager()->getLeafMemoryPool().get());
+ std::shared_ptr<arrow::util::Codec> codec =
+ createCompressionCodec(arrow::Compression::UNCOMPRESSED,
CodecBackend::NONE);
+ std::unique_ptr<VeloxShuffleReaderDeserializerFactory> deserializerFactory;
+ if (enableStreamMerge.has_value()) {
+ deserializerFactory =
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+ schema,
+ codec,
+ arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+ rowType,
+ batchSize,
+ kDefaultReadBufferSize,
+ kDefaultDeserializerBufferSize,
+ getDefaultMemoryManager(),
+ ShuffleWriterType::kHashShuffle,
+ enableStreamMerge.value());
+ } else {
+ deserializerFactory =
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+ schema,
+ codec,
+ arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+ rowType,
+ batchSize,
+ kDefaultReadBufferSize,
+ kDefaultDeserializerBufferSize,
+ getDefaultMemoryManager(),
+ ShuffleWriterType::kHashShuffle);
+ }
+
+ auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+ const auto iter =
reader->read(std::make_shared<MultiStreamReader>(std::move(streams)));
+
+ std::vector<RowVectorPtr> output;
+ while (iter->hasNext()) {
+
output.push_back(std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector());
+ }
+ return output;
+ }
+
+ std::vector<std::shared_ptr<arrow::io::ReadableFile>> readableFiles_;
+};
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) {
+ constexpr int32_t kBatchSize = 6;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({
+ makeFlatVector<int32_t>({1, 2}),
+ makeFlatVector<bool>({true, false}),
+ makeFlatVector<StringView>({"a", "bb"}),
+ makeNullableFlatVector<int64_t>({10, std::nullopt}),
+ }),
+ makeRowVector({
+ makeFlatVector<int32_t>({3, 4}),
+ makeFlatVector<bool>({false, true}),
+ makeFlatVector<StringView>({"ccc", "dddd"}),
+ makeNullableFlatVector<int64_t>({std::nullopt, 40}),
+ }),
+ makeRowVector({
+ makeFlatVector<int32_t>({5, 6}),
+ makeFlatVector<bool>({true, true}),
+ makeFlatVector<StringView>({"eeeee", "ffffff"}),
+ makeNullableFlatVector<int64_t>({50, 60}),
+ }),
+ makeRowVector({
+ makeFlatVector<int32_t>({7, 8}),
+ makeFlatVector<bool>({false, false}),
+ makeFlatVector<StringView>({"ggggggg", "hhhhhhhh"}),
+ makeNullableFlatVector<int64_t>({std::nullopt, std::nullopt}),
+ })};
+
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams =
{writeSinglePartitionStream(inputs)};
+
+ auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), 2);
+ ASSERT_EQ(output[0]->size(), kBatchSize);
+ ASSERT_EQ(output[1]->size(), inputs[3]->size());
+ facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0],
inputs[1], inputs[2]}), output[0]);
+ facebook::velox::test::assertEqualVectors(inputs[3], output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeByDefault) {
+ constexpr int32_t kBatchSize = 100;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+ makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+ makeRowVector({makeFlatVector<int32_t>({5, 6})})};
+
+ const auto rowType = facebook::velox::asRowType(inputs[0]->type());
+ auto output = readStreams(rowType, kBatchSize,
{writeSinglePartitionStream(inputs)});
+
+ ASSERT_EQ(output.size(), inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ facebook::velox::test::assertEqualVectors(inputs[i], output[i]);
+ }
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderMergesWithinEachStreamOnly) {
+ constexpr int32_t kBatchSize = 100;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+ makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+ makeRowVector({makeFlatVector<int32_t>({5, 6})}),
+ makeRowVector({makeFlatVector<int32_t>({7, 8})})};
+
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+ writeSinglePartitionStream({inputs[0], inputs[1]}),
writeSinglePartitionStream({inputs[2], inputs[3]})};
+
+ auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), 2);
+ facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0],
inputs[1]}), output[0]);
+ facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[2],
inputs[3]}), output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeAcrossStreams)
{
+ constexpr int32_t kBatchSize = 6;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+ makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+ makeRowVector({makeFlatVector<int32_t>({5, 6})}),
+ makeRowVector({makeFlatVector<int32_t>({7, 8})})};
+
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams;
+ streams.reserve(inputs.size());
+ for (const auto& input : inputs) {
+ streams.push_back(writeSinglePartitionStream(input));
+ }
+
+ auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ facebook::velox::test::assertEqualVectors(inputs[i], output[i]);
+ }
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderRejectsNonPositiveBatchSize) {
+ auto input = makeRowVector({makeFlatVector<int32_t>({1})});
+ const auto rowType = facebook::velox::asRowType(input->type());
+
+ EXPECT_THROW((void)readStreams(rowType, 0, {}), GlutenException);
+ EXPECT_THROW((void)readStreams(rowType, -1, {}), GlutenException);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderCarriesOverPayloadThatWouldExceedBatchSize) {
+ constexpr int32_t kBatchSize = 6;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({makeFlatVector<int32_t>({1, 2, 3, 4})}),
+ makeRowVector({makeFlatVector<int32_t>({5, 6, 7, 8})}),
+ makeRowVector({makeFlatVector<int32_t>({9})})};
+
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams =
{writeSinglePartitionStream(inputs)};
+
+ auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), 2);
+ facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
+ facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[1],
inputs[2]}), output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderFlushesMergedRowsBeforeDictionaryStream) {
+ constexpr int32_t kBatchSize = 100;
+ auto plainInput = makeRowVector({makeFlatVector<StringView>({"plain-1",
"plain-2"})});
+ auto dictionaryInput = makeRowVector({makeFlatVector<StringView>({"same",
"same", "same", "same"})});
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+ writeSinglePartitionStream(plainInput),
writeSinglePartitionStream(dictionaryInput, true)};
+
+ auto output = readStreams(facebook::velox::asRowType(plainInput->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), 2);
+ facebook::velox::test::assertEqualVectors(plainInput, output[0]);
+ facebook::velox::test::assertEqualVectors(dictionaryInput, output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderDoesNotMergeComplexTypeStreams) {
+ constexpr int32_t kBatchSize = 100;
+ std::vector<RowVectorPtr> inputs = {
+ makeRowVector({makeArrayVector<int64_t>({{1, 2}, {3}})}),
+ makeRowVector({makeArrayVector<int64_t>({{4}, {5, 6}})})};
+
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams;
+ streams.reserve(inputs.size());
+ for (const auto& input : inputs) {
+ streams.push_back(writeSinglePartitionStream(input));
+ }
+
+ auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()),
kBatchSize, std::move(streams), true);
+
+ ASSERT_EQ(output.size(), inputs.size());
+ facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
+ facebook::velox::test::assertEqualVectors(inputs[1], output[1]);
+}
+
+TEST_F(VeloxShuffleReaderStreamMergeTest,
hashReaderDoesNotReuseDictionaryAcrossStreams) {
+ auto dictionaryInput = makeRowVector({makeFlatVector<StringView>({"same",
"same", "same", "same"})});
+ std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
+ writeSinglePartitionStream(dictionaryInput, true),
makeDictionaryPayloadOnlyStream(2)};
+
+ const auto rowType = facebook::velox::asRowType(dictionaryInput->type());
+ const auto schema = toArrowSchema(rowType,
getDefaultMemoryManager()->getLeafMemoryPool().get());
+ std::shared_ptr<arrow::util::Codec> codec =
+ createCompressionCodec(arrow::Compression::UNCOMPRESSED,
CodecBackend::NONE);
+ auto deserializerFactory =
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+ schema,
+ codec,
+ arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+ rowType,
+ kDefaultBatchSize,
+ kDefaultReadBufferSize,
+ kDefaultDeserializerBufferSize,
+ getDefaultMemoryManager(),
+ ShuffleWriterType::kHashShuffle);
+
+ auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+ const auto iter =
reader->read(std::make_shared<MultiStreamReader>(std::move(streams)));
+
+ ASSERT_TRUE(iter->hasNext());
+ facebook::velox::test::assertEqualVectors(
+ dictionaryInput,
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector());
+ EXPECT_THROW((void)iter->hasNext(), GlutenException);
+}
+
TEST_P(SinglePartitioningShuffleWriterTest, single) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index a608dfbc45..a0c0691e2f 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,77 +9,78 @@ nav_order: 16
## Gluten Velox backend configurations
-| Key
| Default |
Description
[...]
-|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
-| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct
| 90 | If partial aggregation aggregationPct greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows
| 100000 | If partial aggregation input rows number greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping
| 30000ms | Timeout in milliseconds when waiting for
runtime-scoped async work to finish during teardown.
[...]
-| spark.gluten.sql.columnar.backend.velox.cacheEnabled
| false | Enable Velox cache, default off. It's recommended to
enablesoft-affinity as well when enable velox cache.
[...]
-| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct
| 0 | Set prefetch cache min pct for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.checkUsageLeak
| true | Enable check memory usage leak.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.batchSize
| 2147483647 | Cudf input batch size after shuffle reader
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan
| false | Enable cudf table scan
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes
| 1028MB | Maximum bytes to prefetch in CPU memory during GPU
shuffle read while waiting for GPU available.
[...]
-| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation
| true | Enable validation fallback for TimestampNTZ type.
When true (default), any plan containing TimestampNTZ will fall back to Spark
execution. Set to false during development/testing of TimestampNTZ support to
allow native execution.
[...]
-| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled
| false | Disables caching if false. File handle cache should
be disabled if files are mutable, i.e. file content may change while file path
stays the same.
[...]
-| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold
| 1MB | Set the file preload threshold for velox file scan,
refer to Velox's file-preload-threshold
[...]
-| spark.gluten.sql.columnar.backend.velox.floatingPointMode
| loose | Config used to control the tolerance of floating
point operations alignment with Spark. When the mode is set to strict, flushing
is disabled for sum(float/double)and avg(float/double). When set to loose,
flushing will be enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation
| true | Enable flushable aggregation. If true, Gluten will
try converting regular aggregation into Velox's flushable aggregation when
applicable. A flushable aggregation could emit intermediate result at anytime
when memory is full / data reduction ratio is low.
[...]
-| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
| 32KB | Set the footer estimated size for velox file scan,
refer to Velox's footer-estimated-size
[...]
-|
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize
| 0b | The maximum byte size of Bloom filter that can be
generated from hash probe. When set to 0, no Bloom filter will be generated. To
achieve optimal performance, this should not be too larger than the CPU cache
size on the host.
[...]
-|
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled
| true | Whether hash probe can generate any dynamic filter
(including Bloom filter) and push down to upstream operators.
[...]
-| spark.gluten.sql.columnar.backend.velox.loadQuantum
| 256MB | Set the load quantum for velox file scan, recommend
to use the default value (256MB) for performance consideration. If Velox cache
is enabled, it can be 8MB at most.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes
| 64MB | Set the max coalesced bytes for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance
| 512KB | Set the max coalesced distance bytes for velox file
scan
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes
| 100 | Controls maximum number of compiled regular
expression patterns per function instance per thread of execution.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory
| <undefined> | Set the max extended memory of partial aggregation in
bytes. When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
Note: this option only works when flushable partial aggregation is enabled.
Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-|
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
| 0.15 | Set the max extended memory of partial aggregation as
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option
only works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory
| <undefined> | Set the max memory of partial aggregation in bytes.
When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note:
this option only works when flushable partial aggregation is enabled. Ignored
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio
| 0.1 | Set the max memory of partial aggregation as
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works
when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession
| 10000 | Maximum number of partitions per a single table
writer instance.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillBytes
| 100G | The maximum file size of a query
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize
| 1GB | The maximum size of a single spill file created
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillLevel
| 4 | The max allowed spilling level with zero being the
initial spilling level
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows
| 3M | The maximum row size of a single spill run
[...]
-| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize
| 0b | The target file size for each output file when
writing data. 0 means no limit on target file size, and the actual file size
will be determined by other factors such as max partition number and shuffle
batch size.
[...]
-| spark.gluten.sql.columnar.backend.velox.memCacheSize
| 1GB | The memory cache size
[...]
-| spark.gluten.sql.columnar.backend.velox.memInitCapacity
| 8MB | The initial memory capacity to reserve for a newly
created Velox query memory pool.
[...]
-|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
-| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
-| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files.
[...]
-| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes
| 1MB | The page size in bytes is for compression.
[...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files.
[...]
-| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
-| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput
| true | If true, combine small columnar batches together
before sending to shuffle. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize
| <undefined> | The minimum batch size for shuffle. If size of an
input batch is smaller than the value, it will be combined with other batches
before sending to shuffle. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to
true. Default value: 0.25 * <max batch size>
[...]
-|
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
| <undefined> | The minimum batch size for shuffle input and output. If
size of an input batch is smaller than the value, it will be combined with
other batches before sending to shuffle. The same applies for batches output by
shuffle read. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput
| false | If true, combine small columnar batches together
right after shuffle read. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
-| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished
| false | Show velox full task metrics when finished.
[...]
-| spark.gluten.sql.columnar.backend.velox.spillFileSystem
| local | The filesystem used to store spill data. local: The
local file system. heap-over-local: Write file to JVM heap if having extra heap
space. Otherwise write to local file system.
[...]
-| spark.gluten.sql.columnar.backend.velox.spillStrategy
| auto | none: Disable spill on Velox backend; auto: Let Spark
memory manager manage Velox's spilling
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads
| 1 | The IO threads for cache promoting
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCachePath
| /tmp | The folder to store the cache files, better on SSD
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheShards
| 1 | The cache shards
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheSize
| 1GB | The SSD cache size, will do memory caching only if
this value = 0
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes
| 0 | Checkpoint after every 'checkpointIntervalBytes' for
SSD cache. 0 means no checkpointing.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled
| false | If true, checksum write to SSD is enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
-| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled
| false | Whether to apply dynamic filters pushed down from
hash probe in the ValueStream (shuffle reader) operator to filter rows before
they reach the hash join.
[...]
-| spark.gluten.sql.enable.enhancedFeatures
| true | Enable some features including iceberg native write
and other features.
[...]
-| spark.gluten.sql.rewrite.castArrayToString
| true | When true, rewrite `cast(array as String)` to
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox.
[...]
-| spark.gluten.velox.broadcast.build.targetBytesPerThread
| 32MB | It is used to calculate the number of hash table
build threads. Based on our testing across various thresholds (1MB to 128MB),
we recommend a value of 32MB or 64MB, as these consistently provided the most
significant performance gains.
[...]
-| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
+| Key
| Default |
Description
[...]
+|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
+| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct
| 90 | If partial aggregation aggregationPct greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows
| 100000 | If partial aggregation input rows number greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping
| 30000ms | Timeout in milliseconds when waiting for
runtime-scoped async work to finish during teardown.
[...]
+| spark.gluten.sql.columnar.backend.velox.cacheEnabled
| false | Enable Velox cache, default off. It's recommended to
enablesoft-affinity as well when enable velox cache.
[...]
+| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct
| 0 | Set prefetch cache min pct for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.checkUsageLeak
| true | Enable check memory usage leak.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.batchSize
| 2147483647 | Cudf input batch size after shuffle reader
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan
| false | Enable cudf table scan
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes
| 1028MB | Maximum bytes to prefetch in CPU memory during GPU
shuffle read while waiting for GPU available.
[...]
+| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation
| true | Enable validation fallback for TimestampNTZ type.
When true (default), any plan containing TimestampNTZ will fall back to Spark
execution. Set to false during development/testing of TimestampNTZ support to
allow native execution.
[...]
+| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled
| false | Disables caching if false. File handle cache should
be disabled if files are mutable, i.e. file content may change while file path
stays the same.
[...]
+| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold
| 1MB | Set the file preload threshold for velox file scan,
refer to Velox's file-preload-threshold
[...]
+| spark.gluten.sql.columnar.backend.velox.floatingPointMode
| loose | Config used to control the tolerance of floating
point operations alignment with Spark. When the mode is set to strict, flushing
is disabled for sum(float/double)and avg(float/double). When set to loose,
flushing will be enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation
| true | Enable flushable aggregation. If true, Gluten will
try converting regular aggregation into Velox's flushable aggregation when
applicable. A flushable aggregation could emit intermediate result at anytime
when memory is full / data reduction ratio is low.
[...]
+| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
| 32KB | Set the footer estimated size for velox file scan,
refer to Velox's footer-estimated-size
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize
| 0b | The maximum byte size of Bloom filter that can be
generated from hash probe. When set to 0, no Bloom filter will be generated. To
achieve optimal performance, this should not be too larger than the CPU cache
size on the host.
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled
| true | Whether hash probe can generate any dynamic filter
(including Bloom filter) and push down to upstream operators.
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled
| false | Enables a reader-side raw payload merge fast path for
plain hash shuffle payloads within each shuffle input stream. This path merges
payload buffers before Velox vectors are materialized, so it has lower
per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only
covers plain payloads. Complex types and dictionary-encoded payloads are not
merged by this path. VeloxRes [...]
+| spark.gluten.sql.columnar.backend.velox.loadQuantum
| 256MB | Set the load quantum for velox file scan, recommend
to use the default value (256MB) for performance consideration. If Velox cache
is enabled, it can be 8MB at most.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes
| 64MB | Set the max coalesced bytes for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance
| 512KB | Set the max coalesced distance bytes for velox file
scan
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes
| 100 | Controls maximum number of compiled regular
expression patterns per function instance per thread of execution.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory
| <undefined> | Set the max extended memory of partial aggregation in
bytes. When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
Note: this option only works when flushable partial aggregation is enabled.
Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+|
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
| 0.15 | Set the max extended memory of partial aggregation as
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option
only works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory
| <undefined> | Set the max memory of partial aggregation in bytes.
When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note:
this option only works when flushable partial aggregation is enabled. Ignored
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio
| 0.1 | Set the max memory of partial aggregation as
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works
when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession
| 10000 | Maximum number of partitions per a single table
writer instance.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillBytes
| 100G | The maximum file size of a query
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize
| 1GB | The maximum size of a single spill file created
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillLevel
| 4 | The max allowed spilling level with zero being the
initial spilling level
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows
| 3M | The maximum row size of a single spill run
[...]
+| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize
| 0b | The target file size for each output file when
writing data. 0 means no limit on target file size, and the actual file size
will be determined by other factors such as max partition number and shuffle
batch size.
[...]
+| spark.gluten.sql.columnar.backend.velox.memCacheSize
| 1GB | The memory cache size
[...]
+| spark.gluten.sql.columnar.backend.velox.memInitCapacity
| 8MB | The initial memory capacity to reserve for a newly
created Velox query memory pool.
[...]
+|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
+| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
+| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes
| 1MB | The page size in bytes is for compression.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files.
[...]
+| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
+| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput
| true | If true, combine small columnar batches together
before sending to shuffle. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize
| <undefined> | The minimum batch size for shuffle. If size of an
input batch is smaller than the value, it will be combined with other batches
before sending to shuffle. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to
true. Default value: 0.25 * <max batch size>
[...]
+|
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
| <undefined> | The minimum batch size for shuffle input and output. If
size of an input batch is smaller than the value, it will be combined with
other batches before sending to shuffle. The same applies for batches output by
shuffle read. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput
| false | If true, combine small columnar batches together
right after shuffle read. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
+| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished
| false | Show velox full task metrics when finished.
[...]
+| spark.gluten.sql.columnar.backend.velox.spillFileSystem
| local | The filesystem used to store spill data. local: The
local file system. heap-over-local: Write file to JVM heap if having extra heap
space. Otherwise write to local file system.
[...]
+| spark.gluten.sql.columnar.backend.velox.spillStrategy
| auto | none: Disable spill on Velox backend; auto: Let Spark
memory manager manage Velox's spilling
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads
| 1 | The IO threads for cache promoting
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCachePath
| /tmp | The folder to store the cache files, better on SSD
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheShards
| 1 | The cache shards
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheSize
| 1GB | The SSD cache size, will do memory caching only if
this value = 0
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes
| 0 | Checkpoint after every 'checkpointIntervalBytes' for
SSD cache. 0 means no checkpointing.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled
| false | If true, checksum write to SSD is enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled
| false | Whether to apply dynamic filters pushed down from
hash probe in the ValueStream (shuffle reader) operator to filter rows before
they reach the hash join.
[...]
+| spark.gluten.sql.enable.enhancedFeatures
| true | Enable some features including iceberg native write
and other features.
[...]
+| spark.gluten.sql.rewrite.castArrayToString
| true | When true, rewrite `cast(array as String)` to
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox.
[...]
+| spark.gluten.velox.broadcast.build.targetBytesPerThread
| 32MB | It is used to calculate the number of hash table
build threads. Based on our testing across various thresholds (1MB to 128MB),
we recommend a value of 32MB or 64MB, as these consistently provided the most
significant performance gains.
[...]
+| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
## Gluten Velox backend *experimental* configurations
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 6a0f2130d7..449bc86558 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -42,7 +42,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
int batchSize,
long readerBufferSize,
long deserializerBufferSize,
- String shuffleWriterType);
+ String shuffleWriterType,
+ boolean enableHashShuffleReaderStreamMerge);
public native long read(long shuffleReaderHandle, ShuffleStreamReader
streamReader);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]