This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 71302c6392 [VL] Add Velox batch resizer copyRanges fast path (#12101)
71302c6392 is described below
commit 71302c63929b44ef6a7658818dcae0386308b2f6
Author: Zhen Li <[email protected]>
AuthorDate: Wed May 20 12:43:04 2026 +0800
[VL] Add Velox batch resizer copyRanges fast path (#12101)
* [VL] Enable Velox batch resizer copyRanges fast path
Add a default-enabled VeloxBatchResizer fast path that collects small dense
batches, allocates the output RowVector once, and bulk-copies child vector
ranges with copyRanges. The config remains available as an opt-out switch.
Wire the flag through Scala, Java, and JNI, add C++ coverage for fast-path
and fallback behavior, add config default coverage, and add dense-vector
benchmark scenarios comparing the append opt-out baseline, default copyRanges
path, direct child copyRanges, reader-side raw payload bulk-copy model, and
pre-merged flush model.
Benchmark results from velox_batch_resizer_benchmark (CPU time; ASLR
enabled, so numbers may have noise):
- Mixed_64x64: append opt-out baseline 95.1us, default copyRanges 19.7us,
direct child copyRanges 17.4us, raw bulk-copy model 33.3us.
- Mixed_16x256: append opt-out baseline 33.7us, default copyRanges 6.4us,
direct child copyRanges 5.0us, raw bulk-copy model 10.5us.
- Mixed_256x16: append opt-out baseline 217.7us, default copyRanges 50.4us,
direct child copyRanges 28.6us, raw bulk-copy model 112.6us.
- Fixed2_64x64: append opt-out baseline 26.6us, default copyRanges 5.5us,
direct child copyRanges 2.0us, raw bulk-copy model 13.7us.
- Fixed16_64x64: append opt-out baseline 121.6us, default copyRanges
27.0us, direct child copyRanges 17.4us, raw bulk-copy model 92.9us.
- LongString_64x64: append opt-out baseline 31.7us, default copyRanges
7.1us, direct child copyRanges 4.5us, raw bulk-copy model 15.3us.
- BoolHeavy_64x64: append opt-out baseline 68.7us, default copyRanges
10.9us, direct child copyRanges 5.4us, raw bulk-copy model 37.7us.
Co-authored-by: Copilot <[email protected]>
* add test
* [VL] Add batch resizer fallback benchmarks
Add dictionary-heavy and constant-heavy VeloxBatchResizer benchmark
scenarios so the default copyRanges-enabled path is compared against the append
opt-out baseline when inputs fall back to RowVector::copy.
Co-authored-by: Copilot <[email protected]>
* [VL] Fix batch resizer CI regressions
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../org/apache/gluten/utils/VeloxBatchResizer.java | 2 +
.../gluten/utils/VeloxBatchResizerJniWrapper.java | 1 +
.../org/apache/gluten/config/VeloxConfig.scala | 21 +
.../gluten/execution/VeloxResizeBatchesExec.scala | 8 +-
.../gluten/config/AllVeloxConfiguration.scala | 5 +
cpp/core/shuffle/Payload.h | 2 +
cpp/velox/benchmarks/CMakeLists.txt | 2 +
cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc | 693 +++++++++++++++++++++
cpp/velox/jni/VeloxJniWrapper.cc | 8 +-
cpp/velox/tests/VeloxBatchResizerTest.cc | 422 +++++++++++++
cpp/velox/utils/VeloxBatchResizer.cc | 141 ++++-
cpp/velox/utils/VeloxBatchResizer.h | 21 +-
docs/velox-configuration.md | 145 ++---
13 files changed, 1392 insertions(+), 79 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
index fec8e05978..5617c4fc47 100644
---
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
@@ -31,6 +31,7 @@ public final class VeloxBatchResizer {
int minOutputBatchSize,
int maxOutputBatchSize,
long preferredBatchBytes,
+ boolean enableCopyRanges,
Iterator<ColumnarBatch> in) {
final Runtime runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"VeloxBatchResizer");
@@ -40,6 +41,7 @@ public final class VeloxBatchResizer {
minOutputBatchSize,
maxOutputBatchSize,
preferredBatchBytes,
+ enableCopyRanges,
new
ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java
index e5b558e97d..908b6a2445 100644
---
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java
@@ -40,5 +40,6 @@ public class VeloxBatchResizerJniWrapper implements
RuntimeAware {
int minOutputBatchSize,
int maxOutputBatchSize,
long preferredBatchBytes,
+ boolean enableCopyRanges,
ColumnarBatchInIterator itr);
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 9ea203d42b..52c964dfe2 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -40,6 +40,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enableHashShuffleReaderStreamMerge: Boolean =
getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED)
+ def enableVeloxResizeBatchesCopyRanges: Boolean =
+ getConf(COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED)
+
case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
assert(min > 0, "Min batch size should be larger than 0")
@@ -339,6 +342,24 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)
+ val COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED =
+
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled")
+ .doc(
+ "Enables a VeloxResizeBatchesExec fast path that combines eligible
batches using " +
+ "Velox vector copyRanges instead of generic RowVector append. When
possible, it " +
+ "collects the small input batches for one VeloxResizeBatchesExec
output, allocates " +
+ "the output RowVector once, and bulk-copies child vector ranges.
This is most useful " +
+ "for shuffle-read outputs where plain hash shuffle payloads are
materialized as " +
+ "dense flat vectors. Complex vectors can also use copyRanges, but
ARRAY and MAP " +
+ "still rebuild nested offsets and sizes while bulk-copying child
ranges. Unsupported " +
+ "encodings such as dictionary and constant vectors fall back to the
generic copy " +
+ "path. This option is enabled by default and complements the
reader-side raw " +
+ "payload merge fast path: that path avoids materializing small plain
payload " +
+ "batches, while this option optimizes VeloxResizeBatchesExec when
that operator " +
+ "is enabled.")
+ .booleanConf
+ .createWithDefault(true)
+
val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.doc(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 3b2c9490e7..0ca76bd97a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer
@@ -41,7 +42,12 @@ case class VeloxResizeBatchesExec(
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
VeloxBatchResizer
- .create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes,
in.asJava)
+ .create(
+ minOutputBatchSize,
+ maxOutputBatchSize,
+ preferredBatchBytes,
+ VeloxConfig.get.enableVeloxResizeBatchesCopyRanges,
+ in.asJava)
.asScala
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala
b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala
index 65059972b9..057a3124e7 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala
@@ -81,4 +81,9 @@ class AllVeloxConfiguration extends AnyFunSuite {
builder.toMarkdown,
"dev/gen-all-config-docs.sh")
}
+
+ test("Velox resize batches copyRanges is enabled by default") {
+ assert(
+
VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED.defaultValue.contains(true))
+ }
}
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index eb130c6375..bb7e2ea1d5 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -55,6 +55,8 @@ class Payload {
return numRows_;
}
+ // Marks buffers merged with bit-level row offsets, including null validity
+ // bitmaps and CPU boolean value bitmaps.
const std::vector<bool>* isValidityBuffer() const {
return isValidityBuffer_;
}
diff --git a/cpp/velox/benchmarks/CMakeLists.txt
b/cpp/velox/benchmarks/CMakeLists.txt
index e56627466c..fdfd7bda25 100644
--- a/cpp/velox/benchmarks/CMakeLists.txt
+++ b/cpp/velox/benchmarks/CMakeLists.txt
@@ -35,3 +35,5 @@ add_velox_benchmark(generic_benchmark GenericBenchmark.cc)
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)
add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)
+
+add_velox_benchmark(velox_batch_resizer_benchmark
VeloxBatchResizerBenchmark.cc)
diff --git a/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc
b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc
new file mode 100644
index 0000000000..7d1c9b8f4d
--- /dev/null
+++ b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <limits>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <arrow/util/bit_util.h>
+#include <arrow/util/bitmap_ops.h>
+#include <benchmark/benchmark.h>
+
+#include "memory/ColumnarBatchIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "shuffle/Payload.h"
+#include "utils/Exception.h"
+#include "utils/VeloxBatchResizer.h"
+#include "velox/common/memory/Memory.h"
+#include "velox/vector/BaseVector.h"
+#include "velox/vector/ComplexVector.h"
+#include "velox/vector/FlatVector.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+namespace {
+
+constexpr int32_t kInputBatches = 64;
+constexpr int32_t kRowsPerBatch = 64;
+constexpr int32_t kTotalRows = kInputBatches * kRowsPerBatch;
+constexpr int64_t kPreferredBatchBytes = std::numeric_limits<int64_t>::max();
+
+enum class DenseVectorKind {
+ kMixed,
+ kFixedWidth,
+ kStringOnly,
+ kBoolHeavy,
+};
+
+struct DenseBenchmarkScenario {
+ int32_t inputBatches;
+ int32_t rowsPerBatch;
+ DenseVectorKind kind;
+ int32_t fixedWidthColumns;
+ int32_t stringBytes;
+ int32_t boolColumns;
+ bool nullable;
+};
+
+constexpr DenseBenchmarkScenario kMixed64x64{kInputBatches, kRowsPerBatch,
DenseVectorKind::kMixed, 0, 16, 1, true};
+constexpr DenseBenchmarkScenario kMixed16x256{16, 256,
DenseVectorKind::kMixed, 0, 16, 1, true};
+constexpr DenseBenchmarkScenario kMixed256x16{256, 16,
DenseVectorKind::kMixed, 0, 16, 1, true};
+constexpr DenseBenchmarkScenario
+ kFixed2_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth,
2, 0, 0, false};
+constexpr DenseBenchmarkScenario
+ kFixed16_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth,
16, 0, 0, false};
+constexpr DenseBenchmarkScenario
+ kLongString64x64{kInputBatches, kRowsPerBatch,
DenseVectorKind::kStringOnly, 0, 64, 0, false};
+constexpr DenseBenchmarkScenario
+ kBoolHeavy64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kBoolHeavy,
0, 0, 8, false};
+
+enum class EncodedVectorKind {
+ kDictionary,
+ kConstant,
+};
+
+struct EncodedBenchmarkScenario {
+ int32_t inputBatches;
+ int32_t rowsPerBatch;
+ EncodedVectorKind kind;
+ int32_t columns;
+};
+
+constexpr EncodedBenchmarkScenario kDictionaryHeavy64x64{
+ kInputBatches,
+ kRowsPerBatch,
+ EncodedVectorKind::kDictionary,
+ 8,
+};
+constexpr EncodedBenchmarkScenario kConstantHeavy64x64{
+ kInputBatches,
+ kRowsPerBatch,
+ EncodedVectorKind::kConstant,
+ 8,
+};
+
+class ColumnarBatchArray : public ColumnarBatchIterator {
+ public:
+ explicit ColumnarBatchArray(std::vector<std::shared_ptr<ColumnarBatch>>
batches) : batches_(std::move(batches)) {}
+
+ std::shared_ptr<ColumnarBatch> next() override {
+ if (cursor_ >= batches_.size()) {
+ return nullptr;
+ }
+ return batches_[cursor_++];
+ }
+
+ private:
+ std::vector<std::shared_ptr<ColumnarBatch>> batches_;
+ size_t cursor_{0};
+};
+
+std::string makeStringValue(int32_t value, int32_t bytes) {
+ auto stringValue = std::to_string(value);
+ if (stringValue.size() < bytes) {
+ stringValue.append(bytes - stringValue.size(), 'x');
+ }
+ return stringValue;
+}
+
+RowVectorPtr makeMixedVector(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ auto i32 = BaseVector::create<FlatVector<int32_t>>(INTEGER(), rows, pool);
+ auto i64 = BaseVector::create<FlatVector<int64_t>>(BIGINT(), rows, pool);
+ auto flag = BaseVector::create<FlatVector<bool>>(BOOLEAN(), rows, pool);
+ auto str = BaseVector::create<FlatVector<StringView>>(VARCHAR(), rows, pool);
+
+ for (auto row = 0; row < rows; ++row) {
+ const auto value = start + row;
+ i32->set(row, value);
+ if (scenario.nullable && row % 7 == 0) {
+ i64->setNull(row, true);
+ } else {
+ i64->set(row, value);
+ }
+ flag->set(row, row % 2 == 0);
+ const auto stringValue = makeStringValue(value, scenario.stringBytes);
+ str->set(row, StringView(stringValue));
+ }
+
+ return std::make_shared<RowVector>(
+ pool,
+ ROW({INTEGER(), BIGINT(), BOOLEAN(), VARCHAR()}),
+ nullptr,
+ rows,
+ std::vector<VectorPtr>{i32, i64, flag, str});
+}
+
+RowVectorPtr makeFixedWidthVector(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ std::vector<VectorPtr> children;
+ std::vector<TypePtr> types;
+ children.reserve(scenario.fixedWidthColumns);
+ types.reserve(scenario.fixedWidthColumns);
+ for (auto channel = 0; channel < scenario.fixedWidthColumns; ++channel) {
+ auto vector = BaseVector::create<FlatVector<int64_t>>(BIGINT(), rows,
pool);
+ for (auto row = 0; row < rows; ++row) {
+ vector->set(row, static_cast<int64_t>(start + row + channel));
+ }
+ children.push_back(std::move(vector));
+ types.push_back(BIGINT());
+ }
+
+ return std::make_shared<RowVector>(pool, ROW(std::move(types)), nullptr,
rows, std::move(children));
+}
+
+RowVectorPtr makeStringVector(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ auto str = BaseVector::create<FlatVector<StringView>>(VARCHAR(), rows, pool);
+ for (auto row = 0; row < rows; ++row) {
+ const auto value = start + row;
+ const auto stringValue = makeStringValue(value, scenario.stringBytes);
+ str->set(row, StringView(stringValue));
+ }
+
+ return std::make_shared<RowVector>(pool, ROW({VARCHAR()}), nullptr, rows,
std::vector<VectorPtr>{str});
+}
+
+RowVectorPtr makeBoolHeavyVector(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ std::vector<VectorPtr> children;
+ std::vector<TypePtr> types;
+ children.reserve(scenario.boolColumns);
+ types.reserve(scenario.boolColumns);
+ for (auto channel = 0; channel < scenario.boolColumns; ++channel) {
+ auto vector = BaseVector::create<FlatVector<bool>>(BOOLEAN(), rows, pool);
+ for (auto row = 0; row < rows; ++row) {
+ vector->set(row, (start + row + channel) % 2 == 0);
+ }
+ children.push_back(std::move(vector));
+ types.push_back(BOOLEAN());
+ }
+
+ return std::make_shared<RowVector>(pool, ROW(std::move(types)), nullptr,
rows, std::move(children));
+}
+
+RowVectorPtr makeDenseVector(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario, int32_t start) {
+ switch (scenario.kind) {
+ case DenseVectorKind::kMixed:
+ return makeMixedVector(pool, scenario, start);
+ case DenseVectorKind::kFixedWidth:
+ return makeFixedWidthVector(pool, scenario, start);
+ case DenseVectorKind::kStringOnly:
+ return makeStringVector(pool, scenario, start);
+ case DenseVectorKind::kBoolHeavy:
+ return makeBoolHeavyVector(pool, scenario, start);
+ }
+ VELOX_UNREACHABLE();
+}
+
+std::vector<RowVectorPtr> makeSmallVectors(memory::MemoryPool* pool, const
DenseBenchmarkScenario& scenario) {
+ std::vector<RowVectorPtr> vectors;
+ vectors.reserve(scenario.inputBatches);
+ for (auto batch = 0; batch < scenario.inputBatches; ++batch) {
+ vectors.push_back(makeDenseVector(pool, scenario, batch *
scenario.rowsPerBatch));
+ }
+ return vectors;
+}
+
+RowVectorPtr
+makeDictionaryHeavyVector(memory::MemoryPool* pool, const
EncodedBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ const auto dictionarySize = std::max<int32_t>(1, rows / 4);
+ std::vector<VectorPtr> children;
+ std::vector<TypePtr> types;
+ children.reserve(scenario.columns);
+ types.reserve(scenario.columns);
+ for (auto channel = 0; channel < scenario.columns; ++channel) {
+ auto base = BaseVector::create<FlatVector<int64_t>>(BIGINT(),
dictionarySize, pool);
+ for (auto row = 0; row < dictionarySize; ++row) {
+ base->set(row, static_cast<int64_t>(start + row + channel));
+ }
+
+ auto indices = allocateIndices(rows, pool);
+ auto* rawIndices = indices->asMutable<vector_size_t>();
+ for (auto row = 0; row < rows; ++row) {
+ rawIndices[row] = (start + row + channel) % dictionarySize;
+ }
+ children.push_back(BaseVector::wrapInDictionary(nullptr,
std::move(indices), rows, std::move(base)));
+ types.push_back(BIGINT());
+ }
+
+ return std::make_shared<RowVector>(pool, ROW(std::move(types)), nullptr,
rows, std::move(children));
+}
+
+RowVectorPtr
+makeConstantHeavyVector(memory::MemoryPool* pool, const
EncodedBenchmarkScenario& scenario, int32_t start) {
+ const auto rows = scenario.rowsPerBatch;
+ std::vector<VectorPtr> children;
+ std::vector<TypePtr> types;
+ children.reserve(scenario.columns);
+ types.reserve(scenario.columns);
+ for (auto channel = 0; channel < scenario.columns; ++channel) {
+ children.push_back(BaseVector::createConstant(BIGINT(),
static_cast<int64_t>(start + channel), rows, pool));
+ types.push_back(BIGINT());
+ }
+
+ return std::make_shared<RowVector>(pool, ROW(std::move(types)), nullptr,
rows, std::move(children));
+}
+
+RowVectorPtr makeEncodedVector(memory::MemoryPool* pool, const
EncodedBenchmarkScenario& scenario, int32_t start) {
+ switch (scenario.kind) {
+ case EncodedVectorKind::kDictionary:
+ return makeDictionaryHeavyVector(pool, scenario, start);
+ case EncodedVectorKind::kConstant:
+ return makeConstantHeavyVector(pool, scenario, start);
+ }
+ VELOX_UNREACHABLE();
+}
+
+std::vector<RowVectorPtr> makeSmallVectors(memory::MemoryPool* pool, const
EncodedBenchmarkScenario& scenario) {
+ std::vector<RowVectorPtr> vectors;
+ vectors.reserve(scenario.inputBatches);
+ for (auto batch = 0; batch < scenario.inputBatches; ++batch) {
+ vectors.push_back(makeEncodedVector(pool, scenario, batch *
scenario.rowsPerBatch));
+ }
+ return vectors;
+}
+
+std::unique_ptr<ColumnarBatchIterator> makeIterator(const
std::vector<RowVectorPtr>& vectors) {
+ std::vector<std::shared_ptr<ColumnarBatch>> batches;
+ batches.reserve(vectors.size());
+ for (const auto& vector : vectors) {
+ batches.push_back(std::make_shared<VeloxColumnarBatch>(vector));
+ }
+ return std::make_unique<ColumnarBatchArray>(std::move(batches));
+}
+
+int64_t totalRows(const DenseBenchmarkScenario& scenario) {
+ return static_cast<int64_t>(scenario.inputBatches) * scenario.rowsPerBatch;
+}
+
+int64_t totalRows(const EncodedBenchmarkScenario& scenario) {
+ return static_cast<int64_t>(scenario.inputBatches) * scenario.rowsPerBatch;
+}
+
+VeloxBatchResizer makeResizeBenchmarkResizer(
+ memory::MemoryPool* pool,
+ int64_t outputBatchSize,
+ std::unique_ptr<ColumnarBatchIterator> iterator,
+ std::optional<bool> enableCopyRanges) {
+ if (enableCopyRanges.has_value()) {
+ return VeloxBatchResizer(
+ pool,
+ outputBatchSize,
+ std::numeric_limits<int32_t>::max(),
+ kPreferredBatchBytes,
+ std::move(iterator),
+ enableCopyRanges.value());
+ }
+ return VeloxBatchResizer(
+ pool, outputBatchSize, std::numeric_limits<int32_t>::max(),
kPreferredBatchBytes, std::move(iterator));
+}
+
+void runResizeBenchmark(
+ benchmark::State& state,
+ const DenseBenchmarkScenario& scenario,
+ std::optional<bool> enableCopyRanges) {
+ auto pool =
memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmark");
+ const auto vectors = makeSmallVectors(pool.get(), scenario);
+ int64_t rows = 0;
+
+ for (auto _ : state) {
+ auto resizer = makeResizeBenchmarkResizer(pool.get(), totalRows(scenario),
makeIterator(vectors), enableCopyRanges);
+ while (auto out = resizer.next()) {
+ rows += out->numRows();
+ }
+ }
+
+ benchmark::DoNotOptimize(rows);
+ state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
totalRows(scenario));
+}
+
+void runFallbackResizeBenchmark(
+ benchmark::State& state,
+ const EncodedBenchmarkScenario& scenario,
+ std::optional<bool> enableCopyRanges) {
+ auto pool =
memory::memoryManager()->addLeafPool("VeloxBatchResizerFallbackBenchmark");
+ const auto vectors = makeSmallVectors(pool.get(), scenario);
+ int64_t rows = 0;
+
+ for (auto _ : state) {
+ auto resizer = makeResizeBenchmarkResizer(pool.get(), totalRows(scenario),
makeIterator(vectors), enableCopyRanges);
+ while (auto out = resizer.next()) {
+ rows += out->numRows();
+ }
+ }
+
+ benchmark::DoNotOptimize(rows);
+ state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
totalRows(scenario));
+}
+
+void runDirectChildCopyRangesBenchmark(benchmark::State& state, const
DenseBenchmarkScenario& scenario) {
+ auto pool =
memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkDirectCopy");
+ const auto vectors = makeSmallVectors(pool.get(), scenario);
+ int64_t rows = 0;
+
+ for (auto _ : state) {
+ auto output = RowVector::createEmpty(vectors[0]->type(), pool.get());
+ output->resize(totalRows(scenario));
+ vector_size_t offset = 0;
+ for (const auto& input : vectors) {
+ const BaseVector::CopyRange range{0, offset, input->size()};
+ for (auto channel = 0; channel < input->children().size(); ++channel) {
+
output->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(),
folly::Range(&range, 1));
+ }
+ offset += input->size();
+ }
+ rows += output->size();
+ benchmark::DoNotOptimize(output);
+ }
+
+ benchmark::DoNotOptimize(rows);
+ state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
totalRows(scenario));
+}
+
+std::shared_ptr<arrow::ResizableBuffer>
allocatePayloadBuffer(arrow::MemoryPool* pool, int64_t size) {
+ std::shared_ptr<arrow::ResizableBuffer> buffer;
+ GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool));
+ memset(buffer->mutable_data(), 0x5A, size);
+ return buffer;
+}
+
+std::shared_ptr<arrow::ResizableBuffer>
allocateEmptyPayloadBuffer(arrow::MemoryPool* pool, int64_t size) {
+ std::shared_ptr<arrow::ResizableBuffer> buffer;
+ GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool));
+ return buffer;
+}
+
+void addFixedWidthRawBuffers(
+ arrow::MemoryPool* pool,
+ int32_t rows,
+ int32_t columns,
+ int32_t valueBytes,
+ std::vector<bool>& validityBuffers,
+ std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+ for (auto channel = 0; channel < columns; ++channel) {
+ validityBuffers.push_back(true);
+ buffers.push_back(nullptr);
+ validityBuffers.push_back(false);
+ buffers.push_back(allocatePayloadBuffer(pool, rows * valueBytes));
+ }
+}
+
+void addFixedWidthRawLayout(int32_t columns, std::vector<bool>&
validityBuffers) {
+ for (auto channel = 0; channel < columns; ++channel) {
+ validityBuffers.push_back(true);
+ validityBuffers.push_back(false);
+ }
+}
+
+void addStringRawBuffers(
+ arrow::MemoryPool* pool,
+ int32_t rows,
+ int32_t stringBytes,
+ bool nullable,
+ std::vector<bool>& validityBuffers,
+ std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+ validityBuffers.push_back(true);
+ buffers.push_back(nullable ? allocatePayloadBuffer(pool,
arrow::bit_util::BytesForBits(rows)) : nullptr);
+ validityBuffers.push_back(false);
+ buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int32_t)));
+ validityBuffers.push_back(false);
+ buffers.push_back(allocatePayloadBuffer(pool, rows * stringBytes));
+}
+
+void addStringRawLayout(std::vector<bool>& validityBuffers) {
+ validityBuffers.push_back(true);
+ validityBuffers.push_back(false);
+ validityBuffers.push_back(false);
+}
+
+void addBoolRawBuffers(
+ arrow::MemoryPool* pool,
+ int32_t rows,
+ int32_t columns,
+ std::vector<bool>& validityBuffers,
+ std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+ for (auto channel = 0; channel < columns; ++channel) {
+ validityBuffers.push_back(true);
+ buffers.push_back(nullptr);
+ validityBuffers.push_back(true);
+ buffers.push_back(allocatePayloadBuffer(pool,
arrow::bit_util::BytesForBits(rows)));
+ }
+}
+
+void addBoolRawLayout(int32_t columns, std::vector<bool>& validityBuffers) {
+ for (auto channel = 0; channel < columns; ++channel) {
+ validityBuffers.push_back(true);
+ validityBuffers.push_back(true);
+ }
+}
+
+std::vector<bool> makeRawPayloadValidityBuffers(const DenseBenchmarkScenario&
scenario) {
+ std::vector<bool> validityBuffers;
+ switch (scenario.kind) {
+ case DenseVectorKind::kMixed:
+ addFixedWidthRawLayout(1, validityBuffers);
+ validityBuffers.push_back(true);
+ validityBuffers.push_back(false);
+ addBoolRawLayout(scenario.boolColumns, validityBuffers);
+ addStringRawLayout(validityBuffers);
+ break;
+ case DenseVectorKind::kFixedWidth:
+ addFixedWidthRawLayout(scenario.fixedWidthColumns, validityBuffers);
+ break;
+ case DenseVectorKind::kStringOnly:
+ addStringRawLayout(validityBuffers);
+ break;
+ case DenseVectorKind::kBoolHeavy:
+ addBoolRawLayout(scenario.boolColumns, validityBuffers);
+ break;
+ }
+ return validityBuffers;
+}
+
+std::unique_ptr<InMemoryPayload> makeRawPayload(
+ arrow::MemoryPool* pool,
+ const DenseBenchmarkScenario& scenario,
+ const std::vector<bool>& validityBuffers) {
+ const auto rows = scenario.rowsPerBatch;
+ std::vector<std::shared_ptr<arrow::Buffer>> buffers;
+ buffers.reserve(validityBuffers.size());
+ std::vector<bool> generatedValidityBuffers;
+ switch (scenario.kind) {
+ case DenseVectorKind::kMixed:
+ addFixedWidthRawBuffers(pool, rows, 1, sizeof(int32_t),
generatedValidityBuffers, buffers);
+ generatedValidityBuffers.push_back(true);
+ buffers.push_back(scenario.nullable ? allocatePayloadBuffer(pool,
arrow::bit_util::BytesForBits(rows)) : nullptr);
+ generatedValidityBuffers.push_back(false);
+ buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int64_t)));
+ addBoolRawBuffers(pool, rows, scenario.boolColumns,
generatedValidityBuffers, buffers);
+ addStringRawBuffers(pool, rows, scenario.stringBytes, false,
generatedValidityBuffers, buffers);
+ break;
+ case DenseVectorKind::kFixedWidth:
+ addFixedWidthRawBuffers(
+ pool, rows, scenario.fixedWidthColumns, sizeof(int64_t),
generatedValidityBuffers, buffers);
+ break;
+ case DenseVectorKind::kStringOnly:
+ addStringRawBuffers(pool, rows, scenario.stringBytes, scenario.nullable,
generatedValidityBuffers, buffers);
+ break;
+ case DenseVectorKind::kBoolHeavy:
+ addBoolRawBuffers(pool, rows, scenario.boolColumns,
generatedValidityBuffers, buffers);
+ break;
+ }
+ GLUTEN_CHECK(generatedValidityBuffers == validityBuffers, "Invalid raw
payload buffer layout");
+ return std::make_unique<InMemoryPayload>(rows, &validityBuffers, nullptr,
std::move(buffers));
+}
+
+std::vector<std::unique_ptr<InMemoryPayload>> makeRawPayloads(
+ arrow::MemoryPool* pool,
+ const DenseBenchmarkScenario& scenario,
+ const std::vector<bool>& validityBuffers) {
+ std::vector<std::unique_ptr<InMemoryPayload>> payloads;
+ payloads.reserve(scenario.inputBatches);
+ for (auto batch = 0; batch < scenario.inputBatches; ++batch) {
+ payloads.push_back(makeRawPayload(pool, scenario, validityBuffers));
+ }
+ return payloads;
+}
+
+std::unique_ptr<InMemoryPayload> mergeRawPayloadsBulkCopy(
+ std::vector<std::unique_ptr<InMemoryPayload>> payloads,
+ const std::vector<bool>& validityBuffers,
+ arrow::MemoryPool* pool) {
+ GLUTEN_CHECK(!payloads.empty(), "Cannot merge empty payloads");
+
+ const auto numBuffers = payloads[0]->numBuffers();
+ std::vector<uint32_t> payloadRows;
+ payloadRows.reserve(payloads.size());
+ uint32_t totalRows = 0;
+ std::vector<std::vector<std::shared_ptr<arrow::Buffer>>>
inputBuffers(payloads.size());
+ std::vector<int64_t> outputSizes(numBuffers, 0);
+ std::vector<bool> hasBuffer(numBuffers, false);
+
+ for (auto payloadIdx = 0; payloadIdx < payloads.size(); ++payloadIdx) {
+ const auto rows = payloads[payloadIdx]->numRows();
+ payloadRows.push_back(rows);
+ totalRows += rows;
+ inputBuffers[payloadIdx].reserve(numBuffers);
+ for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) {
+ GLUTEN_ASSIGN_OR_THROW(auto buffer,
payloads[payloadIdx]->readBufferAt(bufferIdx));
+ if (buffer != nullptr) {
+ hasBuffer[bufferIdx] = true;
+ if (validityBuffers[bufferIdx]) {
+ outputSizes[bufferIdx] = arrow::bit_util::BytesForBits(totalRows);
+ } else {
+ outputSizes[bufferIdx] += buffer->size();
+ }
+ }
+ inputBuffers[payloadIdx].push_back(std::move(buffer));
+ }
+ }
+
+ std::vector<std::shared_ptr<arrow::Buffer>> outputBuffers(numBuffers);
+ for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) {
+ if (hasBuffer[bufferIdx]) {
+ outputBuffers[bufferIdx] = allocateEmptyPayloadBuffer(pool,
outputSizes[bufferIdx]);
+ }
+ }
+
+ std::vector<int64_t> byteOffsets(numBuffers, 0);
+ uint32_t rowOffset = 0;
+ for (auto payloadIdx = 0; payloadIdx < inputBuffers.size(); ++payloadIdx) {
+ const auto rows = payloadRows[payloadIdx];
+ for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) {
+ auto& output = outputBuffers[bufferIdx];
+ if (output == nullptr) {
+ continue;
+ }
+
+ const auto& input = inputBuffers[payloadIdx][bufferIdx];
+ if (validityBuffers[bufferIdx]) {
+ if (input == nullptr) {
+ arrow::bit_util::SetBitsTo(output->mutable_data(), rowOffset, rows,
true);
+ } else {
+ arrow::internal::CopyBitmap(input->data(), 0, rows,
output->mutable_data(), rowOffset);
+ }
+ } else if (input != nullptr) {
+ memcpy(output->mutable_data() + byteOffsets[bufferIdx], input->data(),
input->size());
+ byteOffsets[bufferIdx] += input->size();
+ }
+ }
+ rowOffset += rows;
+ }
+
+ return std::make_unique<InMemoryPayload>(totalRows, &validityBuffers,
nullptr, std::move(outputBuffers));
+}
+
+void BM_VeloxBatchResizerAppendOptOutBaseline(benchmark::State& state,
DenseBenchmarkScenario scenario) {
+ runResizeBenchmark(state, scenario, false);
+}
+
+void BM_VeloxBatchResizerDefaultCopyRanges(benchmark::State& state,
DenseBenchmarkScenario scenario) {
+ runResizeBenchmark(state, scenario, std::nullopt);
+}
+
+void BM_VeloxBatchResizerFallbackAppendOptOutBaseline(benchmark::State& state,
EncodedBenchmarkScenario scenario) {
+ runFallbackResizeBenchmark(state, scenario, false);
+}
+
+void BM_VeloxBatchResizerDefaultCopyRangesFallback(benchmark::State& state,
EncodedBenchmarkScenario scenario) {
+ runFallbackResizeBenchmark(state, scenario, std::nullopt);
+}
+
+void BM_DirectChildCopyRanges(benchmark::State& state, DenseBenchmarkScenario
scenario) {
+ runDirectChildCopyRangesBenchmark(state, scenario);
+}
+
+void BM_ReaderSideRawPayloadBulkCopyModel(benchmark::State& state,
DenseBenchmarkScenario scenario) {
+ auto* pool = arrow::default_memory_pool();
+ const auto validityBuffers = makeRawPayloadValidityBuffers(scenario);
+ int64_t rows = 0;
+
+ for (auto _ : state) {
+ state.PauseTiming();
+ auto payloads = makeRawPayloads(pool, scenario, validityBuffers);
+ state.ResumeTiming();
+
+ auto merged = mergeRawPayloadsBulkCopy(std::move(payloads),
validityBuffers, pool);
+ rows += merged->numRows();
+ benchmark::DoNotOptimize(merged);
+ }
+
+ benchmark::DoNotOptimize(rows);
+ state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
totalRows(scenario));
+}
+
+void BM_ReaderSidePreMergedBatchModel(benchmark::State& state,
DenseBenchmarkScenario scenario) {
+ auto pool =
memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkRawMergeModel");
+ auto mergedScenario = scenario;
+ mergedScenario.inputBatches = 1;
+ mergedScenario.rowsPerBatch = totalRows(scenario);
+ const std::vector<RowVectorPtr> mergedVector{makeDenseVector(pool.get(),
mergedScenario, 0)};
+ int64_t rows = 0;
+
+ for (auto _ : state) {
+ VeloxBatchResizer resizer(
+ pool.get(),
+ totalRows(scenario),
+ std::numeric_limits<int32_t>::max(),
+ kPreferredBatchBytes,
+ makeIterator(mergedVector),
+ false);
+ while (auto out = resizer.next()) {
+ rows += out->numRows();
+ }
+ }
+
+ benchmark::DoNotOptimize(rows);
+ state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
totalRows(scenario));
+}
+
+#define REGISTER_DENSE_SCENARIO_BENCHMARKS(name, scenario)
\
+ BENCHMARK_CAPTURE(BM_VeloxBatchResizerAppendOptOutBaseline, name, scenario);
\
+ BENCHMARK_CAPTURE(BM_VeloxBatchResizerDefaultCopyRanges, name, scenario);
\
+ BENCHMARK_CAPTURE(BM_DirectChildCopyRanges, name, scenario);
\
+ BENCHMARK_CAPTURE(BM_ReaderSideRawPayloadBulkCopyModel, name, scenario);
\
+ BENCHMARK_CAPTURE(BM_ReaderSidePreMergedBatchModel, name, scenario)
+
+#define REGISTER_FALLBACK_SCENARIO_BENCHMARKS(name, scenario)
\
+ BENCHMARK_CAPTURE(BM_VeloxBatchResizerFallbackAppendOptOutBaseline, name,
scenario); \
+ BENCHMARK_CAPTURE(BM_VeloxBatchResizerDefaultCopyRangesFallback, name,
scenario)
+
+REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_64x64, kMixed64x64);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_16x256, kMixed16x256);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_256x16, kMixed256x16);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed2_64x64, kFixed2_64x64);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed16_64x64, kFixed16_64x64);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(LongString_64x64, kLongString64x64);
+REGISTER_DENSE_SCENARIO_BENCHMARKS(BoolHeavy_64x64, kBoolHeavy64x64);
+REGISTER_FALLBACK_SCENARIO_BENCHMARKS(DictionaryHeavy_64x64,
kDictionaryHeavy64x64);
+REGISTER_FALLBACK_SCENARIO_BENCHMARKS(ConstantHeavy_64x64,
kConstantHeavy64x64);
+
+#undef REGISTER_DENSE_SCENARIO_BENCHMARKS
+#undef REGISTER_FALLBACK_SCENARIO_BENCHMARKS
+
+} // namespace
+} // namespace gluten
+
+int main(int argc, char** argv) {
+
facebook::velox::memory::MemoryManager::initialize(facebook::velox::memory::MemoryManager::Options{});
+ ::benchmark::Initialize(&argc, argv);
+ ::benchmark::RunSpecifiedBenchmarks();
+ ::benchmark::Shutdown();
+ return 0;
+}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 72640b21af..aa4d959943 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -464,13 +464,19 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
jint minOutputBatchSize,
jint maxOutputBatchSize,
jlong preferredBatchBytes,
+ jboolean enableCopyRanges,
jobject jIter) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto pool =
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
auto iter = makeJniColumnarBatchIterator(env, jIter, ctx);
auto appender =
std::make_shared<ResultIterator>(std::make_unique<VeloxBatchResizer>(
- pool.get(), minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes,
std::move(iter)));
+ pool.get(),
+ minOutputBatchSize,
+ maxOutputBatchSize,
+ preferredBatchBytes,
+ std::move(iter),
+ enableCopyRanges == JNI_TRUE));
return ctx->saveObject(appender);
JNI_METHOD_END(kInvalidObjectHandle)
}
diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc
b/cpp/velox/tests/VeloxBatchResizerTest.cc
index b23606b1c8..3d850fac05 100644
--- a/cpp/velox/tests/VeloxBatchResizerTest.cc
+++ b/cpp/velox/tests/VeloxBatchResizerTest.cc
@@ -16,6 +16,9 @@
*/
#include <limits>
+#include <optional>
+#include <string>
+#include <vector>
#include "utils/VeloxBatchResizer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"
@@ -53,6 +56,138 @@ class VeloxBatchResizerTest : public ::testing::Test,
public test::VectorTestBas
return out;
}
+ RowVectorPtr newDenseFlatVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::optional<int64_t>> nullableValues;
+ nullableValues.reserve(numRows);
+ std::vector<std::string> strings;
+ strings.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ nullableValues.emplace_back(i % 3 == 0 ? std::nullopt :
std::optional<int64_t>(start + i));
+ strings.emplace_back("long-string-value-" + std::to_string(start + i));
+ }
+ return makeRowVector(
+ {"i32", "i64", "flag", "str"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeNullableFlatVector<int64_t>(nullableValues),
+ makeFlatVector<bool>(numRows, [](auto row) { return row % 2 == 0; }),
+ makeFlatVector<StringView>(numRows, [&strings](auto row) { return
StringView(strings[row]); })});
+ }
+
+ RowVectorPtr newNullableBoolVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::optional<bool>> flags;
+ flags.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ if ((start + i) % 5 == 0) {
+ flags.emplace_back(std::nullopt);
+ } else {
+ flags.emplace_back((start + i) % 3 == 0);
+ }
+ }
+ return makeRowVector({"flag"}, {makeNullableFlatVector<bool>(flags)});
+ }
+
+ RowVectorPtr newDenseFlatVectorWithNullableBool(size_t numRows, int32_t
start = 0) {
+ std::vector<std::optional<bool>> nullableFlags;
+ nullableFlags.reserve(numRows);
+ std::vector<std::optional<int64_t>> nullableValues;
+ nullableValues.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ if ((start + i) % 7 == 0) {
+ nullableFlags.emplace_back(std::nullopt);
+ } else {
+ nullableFlags.emplace_back((start + i) % 2 == 0);
+ }
+ nullableValues.emplace_back((start + i) % 4 == 0 ? std::nullopt :
std::optional<int64_t>(start + i));
+ }
+ return makeRowVector(
+ {"i32", "flag", "i64"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeNullableFlatVector<bool>(nullableFlags),
+ makeNullableFlatVector<int64_t>(nullableValues)});
+ }
+
+ RowVectorPtr newFlatIntVector(size_t numRows, int32_t start = 0) {
+ return makeRowVector({"i32"}, {makeFlatVector<int32_t>(numRows,
[start](auto row) { return start + row; })});
+ }
+
+ RowVectorPtr newComplexVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::vector<int32_t>> arrays;
+ arrays.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ arrays.push_back({start + static_cast<int32_t>(i), start +
static_cast<int32_t>(i) + 1});
+ }
+ return makeRowVector(
+ {"i32", "arr"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeArrayVector<int32_t>(arrays)});
+ }
+
+ RowVectorPtr newNullableComplexVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::optional<std::vector<std::optional<int32_t>>>> arrays;
+ arrays.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ if (i % 5 == 0) {
+ arrays.emplace_back(std::nullopt);
+ } else {
+ arrays.emplace_back(std::vector<std::optional<int32_t>>{
+ start + static_cast<int32_t>(i), std::nullopt, start +
static_cast<int32_t>(i) + 1});
+ }
+ }
+ return makeRowVector(
+ {"i32", "arr"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeNullableArrayVector<int32_t>(arrays)});
+ }
+
+ RowVectorPtr newMapVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::vector<std::pair<int32_t, std::optional<int32_t>>>> maps;
+ maps.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ const auto value = start + static_cast<int32_t>(i);
+ maps.push_back({{value, value + 1}, {value + 2, value + 3}});
+ }
+ return makeRowVector(
+ {"i32", "map"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeMapVector<int32_t, int32_t>(maps)});
+ }
+
+ RowVectorPtr newNullableMapVector(size_t numRows, int32_t start = 0) {
+ std::vector<std::optional<std::vector<std::pair<int32_t,
std::optional<int32_t>>>>> maps;
+ maps.reserve(numRows);
+ for (auto i = 0; i < numRows; ++i) {
+ if (i % 5 == 0) {
+ maps.emplace_back(std::nullopt);
+ } else {
+ const auto value = start + static_cast<int32_t>(i);
+ maps.emplace_back(
+ std::vector<std::pair<int32_t, std::optional<int32_t>>>{{value,
value + 1}, {value + 2, std::nullopt}});
+ }
+ }
+ return makeRowVector(
+ {"i32", "map"},
+ {makeFlatVector<int32_t>(numRows, [start](auto row) { return start +
row; }),
+ makeNullableMapVector<int32_t, int32_t>(maps)});
+ }
+
+ RowVectorPtr newDictionaryVector(size_t numRows, int32_t start = 0) {
+ auto base = makeFlatVector<int32_t>(numRows, [start](auto row) { return
start + row; });
+ auto indices = makeIndices(numRows, [](auto row) { return row; });
+ return makeRowVector({"dict"}, {wrapInDictionary(indices, numRows, base)});
+ }
+
+ RowVectorPtr newTopLevelNullVector(size_t numRows, int32_t start = 0) {
+ auto nulls = allocateNulls(numRows, pool());
+ bits::setNull(nulls->asMutable<uint64_t>(), 0, true);
+ return std::make_shared<RowVector>(
+ pool(),
+ ROW({"i32"}, {INTEGER()}),
+ nulls,
+ numRows,
+ std::vector<VectorPtr>{makeFlatVector<int32_t>(numRows, [start](auto
row) { return start + row; })},
+ 1);
+ }
+
void checkResize(
int32_t min,
int32_t max,
@@ -76,6 +211,42 @@ class VeloxBatchResizerTest : public ::testing::Test,
public test::VectorTestBas
}
ASSERT_EQ(actualOutSizes, outSizes);
}
+
+ RowVectorPtr
+ resizeOnce(const std::vector<RowVectorPtr>& vectors, bool
enableDenseFlatCopy, VeloxBatchResizeStats* stats) {
+ auto out = resizeAll(vectors, 100, std::numeric_limits<int32_t>::max(),
(10L << 20), enableDenseFlatCopy, stats);
+ EXPECT_EQ(out.size(), 1);
+ return out[0];
+ }
+
+ std::vector<RowVectorPtr> resizeAll(
+ const std::vector<RowVectorPtr>& vectors,
+ int32_t minOutputBatchSize,
+ int32_t maxOutputBatchSize,
+ int64_t preferredBatchBytes,
+ bool enableDenseFlatCopy,
+ VeloxBatchResizeStats* stats) {
+ std::vector<std::shared_ptr<ColumnarBatch>> inBatches;
+ inBatches.reserve(vectors.size());
+ for (const auto& vector : vectors) {
+ inBatches.push_back(std::make_shared<VeloxColumnarBatch>(vector));
+ }
+ VeloxBatchResizer resizer(
+ pool(),
+ minOutputBatchSize,
+ maxOutputBatchSize,
+ preferredBatchBytes,
+ std::make_unique<ColumnarBatchArray>(std::move(inBatches)),
+ enableDenseFlatCopy,
+ stats);
+ std::vector<RowVectorPtr> out;
+ while (auto next = resizer.next()) {
+ auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(next);
+ EXPECT_NE(veloxBatch, nullptr);
+ out.push_back(veloxBatch->getRowVector());
+ }
+ return out;
+ }
};
TEST_F(VeloxBatchResizerTest, sanity) {
@@ -100,4 +271,255 @@ TEST_F(VeloxBatchResizerTest, preferredBatchBytesTest) {
ASSERT_ANY_THROW(checkResize(0, 0, 0, {}, {}));
}
+TEST_F(VeloxBatchResizerTest, denseFlatCopyDisabledUsesAppendPath) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newDenseFlatVector(30, 0),
newDenseFlatVector(40, 100)};
+ auto actual = resizeOnce(vectors, false, &stats);
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 0);
+ EXPECT_EQ(stats.appendCopyBatches, 2);
+}
+
+TEST_F(VeloxBatchResizerTest,
denseFlatCopyEnabledUsesCopyRangesForFixedWidthAndString) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newDenseFlatVector(30, 0),
newDenseFlatVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledHandlesNullableBoolBitmaps) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{
+ newNullableBoolVector(3, 0),
+ newNullableBoolVector(5, 100),
+ newNullableBoolVector(9, 200),
+ newNullableBoolVector(17, 300),
+ };
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 4);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest,
copyRangesEnabledHandlesMixedNullableBitmapsAtUnalignedOffsets) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{
+ newDenseFlatVectorWithNullableBool(1, 0),
+ newDenseFlatVectorWithNullableBool(6, 100),
+ newDenseFlatVectorWithNullableBool(10, 200),
+ newDenseFlatVectorWithNullableBool(15, 300),
+ };
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 4);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest,
veloxCopyRangesHandlesNullableBoolBitmapsAtUnalignedOffsets) {
+ auto sourceRow = newNullableBoolVector(48, 100);
+ auto source = sourceRow->childAt(0)->loadedVector();
+ auto actual = newNullableBoolVector(64, 500)->childAt(0);
+ auto expected = newNullableBoolVector(64, 500)->childAt(0);
+ auto expectedFlat = expected->asFlatVector<bool>();
+ auto sourceFlat = source->asFlatVector<bool>();
+ const std::vector<BaseVector::CopyRange> ranges{
+ {.sourceIndex = 1, .targetIndex = 2, .count = 5},
+ {.sourceIndex = 7, .targetIndex = 11, .count = 9},
+ {.sourceIndex = 18, .targetIndex = 24, .count = 13},
+ {.sourceIndex = 33, .targetIndex = 43, .count = 4},
+ };
+ for (const auto& range : ranges) {
+ for (auto i = 0; i < range.count; ++i) {
+ const auto sourceIndex = range.sourceIndex + i;
+ const auto targetIndex = range.targetIndex + i;
+ if (source->isNullAt(sourceIndex)) {
+ expectedFlat->setNull(targetIndex, true);
+ } else {
+ expectedFlat->set(targetIndex, sourceFlat->valueAt(sourceIndex));
+ }
+ }
+ }
+
+ actual->copyRanges(source, ranges);
+
+ test::assertEqualVectors(expected, actual);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsComplexType) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newComplexVector(30, 0),
newComplexVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableComplexType) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newNullableComplexVector(30, 0),
newNullableComplexVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsMapType) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newMapVector(30, 0),
newMapVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableMapType) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newNullableMapVector(30, 0),
newNullableMapVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForConstantEncoding) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newVector(30), newVector(40)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 0);
+ EXPECT_EQ(stats.appendCopyBatches, 2);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 2);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForTopLevelNulls) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newTopLevelNullVector(30, 0),
newTopLevelNullVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 0);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 0);
+ EXPECT_EQ(stats.appendCopyBatches, 2);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 2);
+}
+
+TEST_F(VeloxBatchResizerTest,
copyRangesEnabledCanMixSmallDenseSparseAndDenseBatches) {
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newFlatIntVector(30, 0),
newVector(40), newFlatIntVector(20, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ ASSERT_EQ(actual->size(), expected->size());
+ EXPECT_EQ(actual->size(), 90);
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 1);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 1);
+}
+
+TEST_F(VeloxBatchResizerTest,
copyRangesEnabledFlushesCollectedInputsBeforeSplit) {
+ VeloxBatchResizeStats stats;
+ auto vectors =
+ std::vector<RowVectorPtr>{newFlatIntVector(40, 0), newFlatIntVector(40,
100), newFlatIntVector(40, 200)};
+
+ auto actual = resizeAll(vectors, 100, 50, (10L << 20), true, &stats);
+
+ ASSERT_EQ(actual.size(), 3);
+ test::assertEqualVectors(vectors[0], actual[0]);
+ test::assertEqualVectors(vectors[1], actual[1]);
+ test::assertEqualVectors(vectors[2], actual[2]);
+ EXPECT_EQ(stats.copyRangesBatches, 2);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 2);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest,
copyRangesEnabledFlushesCollectedInputsAtEndOfInput) {
+ VeloxBatchResizeStats stats;
+ auto vectors =
+ std::vector<RowVectorPtr>{newFlatIntVector(30, 0), newFlatIntVector(40,
100), newFlatIntVector(20, 200)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeAll(vectors, 100, std::numeric_limits<int32_t>::max(),
(10L << 20), false, &appendStats);
+
+ auto actual = resizeAll(vectors, 100, std::numeric_limits<int32_t>::max(),
(10L << 20), true, &stats);
+
+ ASSERT_EQ(actual.size(), 1);
+ ASSERT_EQ(expected.size(), 1);
+ test::assertEqualVectors(expected[0], actual[0]);
+ EXPECT_EQ(actual[0]->size(), 90);
+ EXPECT_EQ(stats.copyRangesBatches, 3);
+ EXPECT_EQ(stats.copyRangesOutputBatches, 1);
+ EXPECT_EQ(stats.appendCopyBatches, 0);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 0);
+}
+
+TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForDictionaryEncoding)
{
+ VeloxBatchResizeStats stats;
+ auto vectors = std::vector<RowVectorPtr>{newDictionaryVector(30, 0),
newDictionaryVector(40, 100)};
+ auto appendStats = VeloxBatchResizeStats{};
+ auto expected = resizeOnce(vectors, false, &appendStats);
+
+ auto actual = resizeOnce(vectors, true, &stats);
+
+ test::assertEqualVectors(expected, actual);
+ EXPECT_EQ(stats.copyRangesBatches, 0);
+ EXPECT_EQ(stats.appendCopyBatches, 2);
+ EXPECT_EQ(stats.copyRangesFallbackBatches, 2);
+}
+
} // namespace gluten
diff --git a/cpp/velox/utils/VeloxBatchResizer.cc
b/cpp/velox/utils/VeloxBatchResizer.cc
index a9196a4671..c67b1c7511 100644
--- a/cpp/velox/utils/VeloxBatchResizer.cc
+++ b/cpp/velox/utils/VeloxBatchResizer.cc
@@ -20,6 +20,52 @@
namespace gluten {
namespace {
+bool supportsCopyRanges(const facebook::velox::VectorPtr& vector) {
+ if (vector == nullptr) {
+ return false;
+ }
+
+ if (vector->isFlatEncoding()) {
+ return true;
+ }
+
+ if (vector->encoding() != facebook::velox::VectorEncoding::Simple::ROW &&
+ vector->encoding() != facebook::velox::VectorEncoding::Simple::ARRAY &&
+ vector->encoding() != facebook::velox::VectorEncoding::Simple::MAP) {
+ return false;
+ }
+
+ switch (vector->typeKind()) {
+ case facebook::velox::TypeKind::ROW: {
+ const auto* row = vector->as<facebook::velox::RowVector>();
+ for (const auto& child : row->children()) {
+ if (!supportsCopyRanges(child)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ case facebook::velox::TypeKind::ARRAY: {
+ const auto* array = vector->as<facebook::velox::ArrayVector>();
+ return supportsCopyRanges(array->elements());
+ }
+ case facebook::velox::TypeKind::MAP: {
+ const auto* map = vector->as<facebook::velox::MapVector>();
+ return supportsCopyRanges(map->mapKeys()) &&
supportsCopyRanges(map->mapValues());
+ }
+ default:
+ return false;
+ }
+}
+
+bool supportsCopyRanges(const facebook::velox::RowVectorPtr& rowVector) {
+ if (rowVector == nullptr || rowVector->encoding() !=
facebook::velox::VectorEncoding::Simple::ROW ||
+ rowVector->mayHaveNulls()) {
+ return false;
+ }
+ return
supportsCopyRanges(std::static_pointer_cast<facebook::velox::BaseVector>(rowVector));
+}
+
class SliceRowVector : public ColumnarBatchIterator {
public:
SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in)
@@ -50,17 +96,100 @@ gluten::VeloxBatchResizer::VeloxBatchResizer(
int32_t minOutputBatchSize,
int32_t maxOutputBatchSize,
int64_t preferredBatchBytes,
- std::unique_ptr<ColumnarBatchIterator> in)
+ std::unique_ptr<ColumnarBatchIterator> in,
+ bool enableCopyRanges,
+ VeloxBatchResizeStats* stats)
: pool_(pool),
minOutputBatchSize_(minOutputBatchSize),
maxOutputBatchSize_(maxOutputBatchSize),
preferredBatchBytes_(static_cast<uint64_t>(preferredBatchBytes)),
- in_(std::move(in)) {
+ enableCopyRanges_(enableCopyRanges),
+ in_(std::move(in)),
+ stats_(stats) {
GLUTEN_CHECK(
minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0,
"Either minOutputBatchSize or maxOutputBatchSize should be larger than
0");
}
+void VeloxBatchResizer::appendToBuffer(
+ facebook::velox::RowVectorPtr& buffer,
+ const facebook::velox::RowVectorPtr& input) {
+ buffer->append(input.get());
+ if (stats_ != nullptr) {
+ ++stats_->appendCopyBatches;
+ }
+}
+
+facebook::velox::RowVectorPtr VeloxBatchResizer::copyBufferedInputs(
+ const std::vector<facebook::velox::RowVectorPtr>& inputs) {
+ GLUTEN_CHECK(!inputs.empty(), "Cannot copy empty inputs");
+
+ facebook::velox::vector_size_t totalRows = 0;
+ for (const auto& input : inputs) {
+ totalRows += input->size();
+ }
+
+ auto buffer = facebook::velox::RowVector::createEmpty(inputs[0]->type(),
pool_);
+ buffer->resize(totalRows);
+
+ bool usedCopyRanges = false;
+ facebook::velox::vector_size_t offset = 0;
+ for (const auto& input : inputs) {
+ if (supportsCopyRanges(input)) {
+ const facebook::velox::BaseVector::CopyRange range{0, offset,
input->size()};
+ for (auto channel = 0; channel < input->children().size(); ++channel) {
+
buffer->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(),
folly::Range(&range, 1));
+ }
+ usedCopyRanges = true;
+ if (stats_ != nullptr) {
+ ++stats_->copyRangesBatches;
+ }
+ } else {
+ buffer->copy(input.get(), offset, 0, input->size());
+ if (stats_ != nullptr) {
+ ++stats_->copyRangesFallbackBatches;
+ ++stats_->appendCopyBatches;
+ }
+ }
+ offset += input->size();
+ }
+
+ if (usedCopyRanges && stats_ != nullptr) {
+ ++stats_->copyRangesOutputBatches;
+ }
+ return buffer;
+}
+
+std::shared_ptr<ColumnarBatch> VeloxBatchResizer::collectAndCopy(
+ facebook::velox::RowVectorPtr firstInput,
+ uint64_t numBytes) {
+ std::vector<facebook::velox::RowVectorPtr> inputs;
+ inputs.push_back(std::move(firstInput));
+ facebook::velox::vector_size_t bufferedRows = inputs.back()->size();
+
+ std::shared_ptr<ColumnarBatch> cb;
+ for (cb = in_->next(); cb != nullptr; cb = in_->next()) {
+ auto vb = VeloxColumnarBatch::from(pool_, cb);
+ auto rv = vb->getRowVector();
+ uint64_t addedBytes = cb->numBytes();
+ if (bufferedRows + rv->size() > maxOutputBatchSize_ ||
+ numBytes + addedBytes > static_cast<uint64_t>(preferredBatchBytes_)) {
+ GLUTEN_CHECK(next_ == nullptr, "Invalid state");
+ next_ = std::make_unique<SliceRowVector>(maxOutputBatchSize_, rv);
+ return std::make_shared<VeloxColumnarBatch>(copyBufferedInputs(inputs));
+ }
+
+ numBytes += addedBytes;
+ bufferedRows += rv->size();
+ inputs.push_back(std::move(rv));
+ if (bufferedRows >= minOutputBatchSize_) {
+ break;
+ }
+ }
+
+ return std::make_shared<VeloxColumnarBatch>(copyBufferedInputs(inputs));
+}
+
std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
if (next_) {
auto next = next_->next();
@@ -81,8 +210,12 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
if (cb->numRows() < minOutputBatchSize_ && numBytes <= preferredBatchBytes_)
{
auto vb = VeloxColumnarBatch::from(pool_, cb);
auto rv = vb->getRowVector();
+ if (enableCopyRanges_) {
+ return collectAndCopy(std::move(rv), numBytes);
+ }
+
auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_);
- buffer->append(rv.get());
+ appendToBuffer(buffer, rv);
for (cb = in_->next(); cb != nullptr; cb = in_->next()) {
vb = VeloxColumnarBatch::from(pool_, cb);
@@ -95,7 +228,7 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
return std::make_shared<VeloxColumnarBatch>(buffer);
}
numBytes += addedBytes;
- buffer->append(rv.get());
+ appendToBuffer(buffer, rv);
if (buffer->size() >= minOutputBatchSize_) {
// Buffer is full.
break;
diff --git a/cpp/velox/utils/VeloxBatchResizer.h
b/cpp/velox/utils/VeloxBatchResizer.h
index 8afd191dca..5a6b71e931 100644
--- a/cpp/velox/utils/VeloxBatchResizer.h
+++ b/cpp/velox/utils/VeloxBatchResizer.h
@@ -23,6 +23,15 @@
namespace gluten {
+struct VeloxBatchResizeStats {
+ int64_t copyRangesBatches{0};
+ int64_t copyRangesOutputBatches{0};
+ // Counts generic copies: RowVector::append when copyRanges is disabled and
+ // RowVector::copy fallbacks when copyRanges is enabled.
+ int64_t appendCopyBatches{0};
+ int64_t copyRangesFallbackBatches{0};
+};
+
class VeloxBatchResizer : public ColumnarBatchIterator {
public:
VeloxBatchResizer(
@@ -30,7 +39,9 @@ class VeloxBatchResizer : public ColumnarBatchIterator {
int32_t minOutputBatchSize,
int32_t maxOutputBatchSize,
int64_t preferredBatchBytes,
- std::unique_ptr<ColumnarBatchIterator> in);
+ std::unique_ptr<ColumnarBatchIterator> in,
+ bool enableCopyRanges = true,
+ VeloxBatchResizeStats* stats = nullptr);
std::shared_ptr<ColumnarBatch> next() override;
@@ -41,9 +52,17 @@ class VeloxBatchResizer : public ColumnarBatchIterator {
const int32_t minOutputBatchSize_;
const int32_t maxOutputBatchSize_;
const uint64_t preferredBatchBytes_;
+ const bool enableCopyRanges_;
std::unique_ptr<ColumnarBatchIterator> in_;
+ VeloxBatchResizeStats* stats_;
std::unique_ptr<ColumnarBatchIterator> next_ = nullptr;
+
+ void appendToBuffer(facebook::velox::RowVectorPtr& buffer, const
facebook::velox::RowVectorPtr& input);
+
+ facebook::velox::RowVectorPtr copyBufferedInputs(const
std::vector<facebook::velox::RowVectorPtr>& inputs);
+
+ std::shared_ptr<ColumnarBatch> collectAndCopy(facebook::velox::RowVectorPtr
firstInput, uint64_t numBytes);
};
} // namespace gluten
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index a0c0691e2f..342a9b8f34 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,78 +9,79 @@ nav_order: 16
## Gluten Velox backend configurations
-| Key
| Default |
Description
[...]
-|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
-| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct
| 90 | If partial aggregation aggregationPct greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows
| 100000 | If partial aggregation input rows number greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping
| 30000ms | Timeout in milliseconds when waiting for
runtime-scoped async work to finish during teardown.
[...]
-| spark.gluten.sql.columnar.backend.velox.cacheEnabled
| false | Enable Velox cache, default off. It's recommended to
enablesoft-affinity as well when enable velox cache.
[...]
-| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct
| 0 | Set prefetch cache min pct for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.checkUsageLeak
| true | Enable check memory usage leak.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.batchSize
| 2147483647 | Cudf input batch size after shuffle reader
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan
| false | Enable cudf table scan
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
-| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes
| 1028MB | Maximum bytes to prefetch in CPU memory during GPU
shuffle read while waiting for GPU available.
[...]
-| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation
| true | Enable validation fallback for TimestampNTZ type.
When true (default), any plan containing TimestampNTZ will fall back to Spark
execution. Set to false during development/testing of TimestampNTZ support to
allow native execution.
[...]
-| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled
| false | Disables caching if false. File handle cache should
be disabled if files are mutable, i.e. file content may change while file path
stays the same.
[...]
-| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold
| 1MB | Set the file preload threshold for velox file scan,
refer to Velox's file-preload-threshold
[...]
-| spark.gluten.sql.columnar.backend.velox.floatingPointMode
| loose | Config used to control the tolerance of floating
point operations alignment with Spark. When the mode is set to strict, flushing
is disabled for sum(float/double)and avg(float/double). When set to loose,
flushing will be enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation
| true | Enable flushable aggregation. If true, Gluten will
try converting regular aggregation into Velox's flushable aggregation when
applicable. A flushable aggregation could emit intermediate result at anytime
when memory is full / data reduction ratio is low.
[...]
-| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
| 32KB | Set the footer estimated size for velox file scan,
refer to Velox's footer-estimated-size
[...]
-|
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize
| 0b | The maximum byte size of Bloom filter that can be
generated from hash probe. When set to 0, no Bloom filter will be generated. To
achieve optimal performance, this should not be too larger than the CPU cache
size on the host.
[...]
-|
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled
| true | Whether hash probe can generate any dynamic filter
(including Bloom filter) and push down to upstream operators.
[...]
-|
spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled
| false | Enables a reader-side raw payload merge fast path for
plain hash shuffle payloads within each shuffle input stream. This path merges
payload buffers before Velox vectors are materialized, so it has lower
per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only
covers plain payloads. Complex types and dictionary-encoded payloads are not
merged by this path. VeloxRes [...]
-| spark.gluten.sql.columnar.backend.velox.loadQuantum
| 256MB | Set the load quantum for velox file scan, recommend
to use the default value (256MB) for performance consideration. If Velox cache
is enabled, it can be 8MB at most.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes
| 64MB | Set the max coalesced bytes for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance
| 512KB | Set the max coalesced distance bytes for velox file
scan
[...]
-| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes
| 100 | Controls maximum number of compiled regular
expression patterns per function instance per thread of execution.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory
| <undefined> | Set the max extended memory of partial aggregation in
bytes. When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
Note: this option only works when flushable partial aggregation is enabled.
Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-|
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
| 0.15 | Set the max extended memory of partial aggregation as
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option
only works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory
| <undefined> | Set the max memory of partial aggregation in bytes.
When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note:
this option only works when flushable partial aggregation is enabled. Ignored
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio
| 0.1 | Set the max memory of partial aggregation as
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works
when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession
| 10000 | Maximum number of partitions per a single table
writer instance.
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillBytes
| 100G | The maximum file size of a query
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize
| 1GB | The maximum size of a single spill file created
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillLevel
| 4 | The max allowed spilling level with zero being the
initial spilling level
[...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows
| 3M | The maximum row size of a single spill run
[...]
-| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize
| 0b | The target file size for each output file when
writing data. 0 means no limit on target file size, and the actual file size
will be determined by other factors such as max partition number and shuffle
batch size.
[...]
-| spark.gluten.sql.columnar.backend.velox.memCacheSize
| 1GB | The memory cache size
[...]
-| spark.gluten.sql.columnar.backend.velox.memInitCapacity
| 8MB | The initial memory capacity to reserve for a newly
created Velox query memory pool.
[...]
-|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
-| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
-| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files.
[...]
-| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes
| 1MB | The page size in bytes is for compression.
[...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files.
[...]
-| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
-| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
-| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput
| true | If true, combine small columnar batches together
before sending to shuffle. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize
| <undefined> | The minimum batch size for shuffle. If size of an
input batch is smaller than the value, it will be combined with other batches
before sending to shuffle. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to
true. Default value: 0.25 * <max batch size>
[...]
-|
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
| <undefined> | The minimum batch size for shuffle input and output. If
size of an input batch is smaller than the value, it will be combined with
other batches before sending to shuffle. The same applies for batches output by
shuffle read. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput
| false | If true, combine small columnar batches together
right after shuffle read. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
-| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished
| false | Show velox full task metrics when finished.
[...]
-| spark.gluten.sql.columnar.backend.velox.spillFileSystem
| local | The filesystem used to store spill data. local: The
local file system. heap-over-local: Write file to JVM heap if having extra heap
space. Otherwise write to local file system.
[...]
-| spark.gluten.sql.columnar.backend.velox.spillStrategy
| auto | none: Disable spill on Velox backend; auto: Let Spark
memory manager manage Velox's spilling
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads
| 1 | The IO threads for cache promoting
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCachePath
| /tmp | The folder to store the cache files, better on SSD
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheShards
| 1 | The cache shards
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheSize
| 1GB | The SSD cache size, will do memory caching only if
this value = 0
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes
| 0 | Checkpoint after every 'checkpointIntervalBytes' for
SSD cache. 0 means no checkpointing.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled
| false | If true, checksum write to SSD is enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
-| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
-| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled
| false | Whether to apply dynamic filters pushed down from
hash probe in the ValueStream (shuffle reader) operator to filter rows before
they reach the hash join.
[...]
-| spark.gluten.sql.enable.enhancedFeatures
| true | Enable some features including iceberg native write
and other features.
[...]
-| spark.gluten.sql.rewrite.castArrayToString
| true | When true, rewrite `cast(array as String)` to
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox.
[...]
-| spark.gluten.velox.broadcast.build.targetBytesPerThread
| 32MB | It is used to calculate the number of hash table
build threads. Based on our testing across various thresholds (1MB to 128MB),
we recommend a value of 32MB or 64MB, as these consistently provided the most
significant performance gains.
[...]
-| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
+| Key
| Default |
[...]
+|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| spark.gluten.sql.columnar.backend.velox.IOThreads
| <undefined> | The Size of the IO thread pool in the Connector. This
thread pool is used for split preloading and DirectBufferedInput. By default,
the value is the same as the maximum task slots per Spark executor.
[...]
+| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver
| 2 | The split preload per task
[...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct
| 90 | If partial aggregation aggregationPct greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows
| 100000 | If partial aggregation input rows number greater than
this value, partial aggregation may be early abandoned. Note: this option only
works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping
| 30000ms | Timeout in milliseconds when waiting for
runtime-scoped async work to finish during teardown.
[...]
+| spark.gluten.sql.columnar.backend.velox.cacheEnabled
| false | Enable Velox cache, default off. It's recommended to
enablesoft-affinity as well when enable velox cache.
[...]
+| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct
| 0 | Set prefetch cache min pct for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.checkUsageLeak
| true | Enable check memory usage leak.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.batchSize
| 2147483647 | Cudf input batch size after shuffle reader
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan
| false | Enable cudf table scan
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes
| 1028MB | Maximum bytes to prefetch in CPU memory during GPU
shuffle read while waiting for GPU available.
[...]
+| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation
| true | Enable validation fallback for TimestampNTZ type.
When true (default), any plan containing TimestampNTZ will fall back to Spark
execution. Set to false during development/testing of TimestampNTZ support to
allow native execution.
[...]
+| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled
| false | Disables caching if false. File handle cache should
be disabled if files are mutable, i.e. file content may change while file path
stays the same.
[...]
+| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold
| 1MB | Set the file preload threshold for velox file scan,
refer to Velox's file-preload-threshold
[...]
+| spark.gluten.sql.columnar.backend.velox.floatingPointMode
| loose | Config used to control the tolerance of floating
point operations alignment with Spark. When the mode is set to strict, flushing
is disabled for sum(float/double)and avg(float/double). When set to loose,
flushing will be enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation
| true | Enable flushable aggregation. If true, Gluten will
try converting regular aggregation into Velox's flushable aggregation when
applicable. A flushable aggregation could emit intermediate result at anytime
when memory is full / data reduction ratio is low.
[...]
+| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
| 32KB | Set the footer estimated size for velox file scan,
refer to Velox's footer-estimated-size
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize
| 0b | The maximum byte size of Bloom filter that can be
generated from hash probe. When set to 0, no Bloom filter will be generated. To
achieve optimal performance, this should not be too larger than the CPU cache
size on the host.
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled
| true | Whether hash probe can generate any dynamic filter
(including Bloom filter) and push down to upstream operators.
[...]
+|
spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled
| false | Enables a reader-side raw payload merge fast path for
plain hash shuffle payloads within each shuffle input stream. This path merges
payload buffers before Velox vectors are materialized, so it has lower
per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only
covers plain payloads. Complex types and dictionary-encoded payloads are not
merged by this path. VeloxRes [...]
+| spark.gluten.sql.columnar.backend.velox.loadQuantum
| 256MB | Set the load quantum for velox file scan, recommend
to use the default value (256MB) for performance consideration. If Velox cache
is enabled, it can be 8MB at most.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes
| 64MB | Set the max coalesced bytes for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance
| 512KB | Set the max coalesced distance bytes for velox file
scan
[...]
+| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes
| 100 | Controls maximum number of compiled regular
expression patterns per function instance per thread of execution.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory
| <undefined> | Set the max extended memory of partial aggregation in
bytes. When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
Note: this option only works when flushable partial aggregation is enabled.
Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+|
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
| 0.15 | Set the max extended memory of partial aggregation as
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option
only works when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory
| <undefined> | Set the max memory of partial aggregation in bytes.
When this option is set to a value greater than 0, it will override
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note:
this option only works when flushable partial aggregation is enabled. Ignored
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio
| 0.1 | Set the max memory of partial aggregation as
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works
when flushable partial aggregation is enabled. Ignored when
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession
| 10000 | Maximum number of partitions per a single table
writer instance.
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillBytes
| 100G | The maximum file size of a query
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize
| 1GB | The maximum size of a single spill file created
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillLevel
| 4 | The max allowed spilling level with zero being the
initial spilling level
[...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows
| 3M | The maximum row size of a single spill run
[...]
+| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize
| 0b | The target file size for each output file when
writing data. 0 means no limit on target file size, and the actual file size
will be determined by other factors such as max partition number and shuffle
batch size.
[...]
+| spark.gluten.sql.columnar.backend.velox.memCacheSize
| 1GB | The memory cache size
[...]
+| spark.gluten.sql.columnar.backend.velox.memInitCapacity
| 8MB | The initial memory capacity to reserve for a newly
created Velox query memory pool.
[...]
+|
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks
| true | Whether to allow memory capacity transfer between memory
pools from different tasks.
[...]
+| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages
| false | Use explicit huge pages for Velox memory allocation.
[...]
+| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled
| true | Enable velox orc scan. If disabled, vanilla spark orc
scan will be used.
[...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames
| true | Maps table field names to file field names using
names, not indices for ORC files.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes
| 1MB | The page size in bytes is for compression.
[...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames
| true | Maps table field names to file field names using
names, not indices for Parquet files.
[...]
+| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups
| 1 | Set the prefetch row groups for velox file scan
[...]
+| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled
| false | Enable query tracing flag.
[...]
+| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs
| 3600000ms | The max time in ms to wait for memory reclaim.
[...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled
| true | Enables a VeloxResizeBatchesExec fast path that
combines eligible batches using Velox vector copyRanges instead of generic
RowVector append. When possible, it collects the small input batches for one
VeloxResizeBatchesExec output, allocates the output RowVector once, and
bulk-copies child vector ranges. This is most useful for shuffle-read outputs
where plain hash shuffle payloads are [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput
| true | If true, combine small columnar batches together
before sending to shuffle. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize
| <undefined> | The minimum batch size for shuffle. If size of an
input batch is smaller than the value, it will be combined with other batches
before sending to shuffle. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to
true. Default value: 0.25 * <max batch size>
[...]
+|
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
| <undefined> | The minimum batch size for shuffle input and output. If
size of an input batch is smaller than the value, it will be combined with
other batches before sending to shuffle. The same applies for batches output by
shuffle read. Only functions when
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput
| false | If true, combine small columnar batches together
right after shuffle read. The default minimum output batch size is equal to
0.25 * spark.gluten.sql.columnar.maxBatchSize
[...]
+| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished
| false | Show velox full task metrics when finished.
[...]
+| spark.gluten.sql.columnar.backend.velox.spillFileSystem
| local | The filesystem used to store spill data. local: The
local file system. heap-over-local: Write file to JVM heap if having extra heap
space. Otherwise write to local file system.
[...]
+| spark.gluten.sql.columnar.backend.velox.spillStrategy
| auto | none: Disable spill on Velox backend; auto: Let Spark
memory manager manage Velox's spilling
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads
| 1 | The IO threads for cache promoting
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCachePath
| /tmp | The folder to store the cache files, better on SSD
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheShards
| 1 | The cache shards
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheSize
| 1GB | The SSD cache size, will do memory caching only if
this value = 0
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes
| 0 | Checkpoint after every 'checkpointIntervalBytes' for
SSD cache. 0 means no checkpointing.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled
| false | If true, checksum write to SSD is enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
+| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled
| false | Whether to apply dynamic filters pushed down from
hash probe in the ValueStream (shuffle reader) operator to filter rows before
they reach the hash join.
[...]
+| spark.gluten.sql.enable.enhancedFeatures
| true | Enable some features including iceberg native write
and other features.
[...]
+| spark.gluten.sql.rewrite.castArrayToString
| true | When true, rewrite `cast(array as String)` to
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox.
[...]
+| spark.gluten.velox.broadcast.build.targetBytesPerThread
| 32MB | It is used to calculate the number of hash table
build threads. Based on our testing across various thresholds (1MB to 128MB),
we recommend a value of 32MB or 64MB, as these consistently provided the most
significant performance gains.
[...]
+| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
## Gluten Velox backend *experimental* configurations
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]