This is an automated email from the ASF dual-hosted git repository.

zhli1142015 pushed a commit to branch vl-hash-shuffle-reader-stream-merge
in repository https://gitbox.apache.org/repos/asf/gluten.git

commit 418503b6c8a764caefb87a5ee7aeeb9ad61f704b
Author: zhli1142015 <[email protected]>
AuthorDate: Fri May 15 23:13:47 2026 +0800

    [VL] Add hash shuffle reader stream merge config
    
    Gate the reader-side raw payload merge fast path behind a Velox config and 
document how it complements VeloxResizeBatchesExec.
    
    Co-authored-by: Copilot <[email protected]>
---
 .../VeloxCelebornColumnarBatchSerializer.scala     |   6 +-
 .../org/apache/gluten/config/VeloxConfig.scala     |  17 +++
 .../vectorized/ColumnarBatchSerializer.scala       |   7 +-
 cpp/core/jni/JniWrapper.cc                         |   4 +-
 cpp/core/shuffle/Options.h                         |   4 +
 cpp/velox/compute/VeloxRuntime.cc                  |   3 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc            |  17 ++-
 cpp/velox/shuffle/VeloxShuffleReader.h             |   6 +-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          |  67 +++++++---
 docs/velox-configuration.md                        | 143 +++++++++++----------
 .../gluten/vectorized/ShuffleReaderJniWrapper.java |   3 +-
 11 files changed, 177 insertions(+), 100 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index f377ea99f6..2c36c773f4 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.shuffle
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.utils.ArrowAbiUtil
@@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance(
     val batchSize = GlutenConfig.get.maxBatchSize
     val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
     val deserializerBufferSize = 
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+    val enableHashShuffleReaderStreamMerge = 
VeloxConfig.get.enableHashShuffleReaderStreamMerge
     val handle = jniWrapper
       .make(
         cSchema.memoryAddress(),
@@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance(
         batchSize,
         readerBufferSize,
         deserializerBufferSize,
-        shuffleWriterType.name
+        shuffleWriterType.name,
+        enableHashShuffleReaderStreamMerge
       )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index dbc833f046..9ea203d42b 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
   def veloxResizeBatchesShuffleOutput: Boolean =
     getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)
 
+  def enableHashShuffleReaderStreamMerge: Boolean =
+    getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED)
+
   case class ResizeRange(min: Int, max: Int) {
     assert(max >= min)
     assert(min > 0, "Min batch size should be larger than 0")
@@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled")
+      .doc(
+        "Enables a reader-side raw payload merge fast path for plain hash 
shuffle payloads " +
+          "within each shuffle input stream. This path merges payload buffers 
before Velox " +
+          "vectors are materialized, so it has lower per-batch overhead than 
generic " +
+          "VeloxResizeBatchesExec resizing, but it only covers plain payloads. 
Complex types " +
+          "and dictionary-encoded payloads are not merged by this path. " +
+          "VeloxResizeBatchesExec can still be enabled separately as a generic 
complement " +
+          "for types and encodings not covered by this fast path. If false, 
each hash " +
+          "shuffle payload is returned as its own columnar batch.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
     
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
       .doc(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 3b5fce63f8..284d931ecc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.vectorized
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
+import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
 import org.apache.gluten.iterator.ClosableIterator
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
@@ -104,6 +104,7 @@ private class ColumnarBatchSerializerInstanceImpl(
     val batchSize = GlutenConfig.get.maxBatchSize
     val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
     val deserializerBufferSize = 
GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
+    val enableHashShuffleReaderStreamMerge = 
VeloxConfig.get.enableHashShuffleReaderStreamMerge
     val shuffleReaderHandle = jniWrapper.make(
       cSchema.memoryAddress(),
       compressionCodec,
@@ -111,7 +112,9 @@ private class ColumnarBatchSerializerInstanceImpl(
       batchSize,
       readerBufferSize,
       deserializerBufferSize,
-      shuffleWriterType.name)
+      shuffleWriterType.name,
+      enableHashShuffleReaderStreamMerge
+    )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
     // was used to create all buffers read from shuffle reader. The pool
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 9e194be6ea..70689c950a 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
     jint batchSize,
     jlong readerBufferSize,
     jlong deserializerBufferSize,
-    jstring shuffleWriterType) {
+    jstring shuffleWriterType,
+    jboolean enableHashShuffleReaderStreamMerge) {
   JNI_METHOD_START
   auto ctx = getRuntime(env, wrapper);
 
@@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
   options.batchSize = batchSize;
   options.readerBufferSize = readerBufferSize;
   options.deserializerBufferSize = deserializerBufferSize;
+  options.enableHashShuffleReaderStreamMerge = 
enableHashShuffleReaderStreamMerge;
 
   options.shuffleWriterType = 
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
   std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 1d7f9ad9f9..ea3aff10cf 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -63,6 +63,10 @@ struct ShuffleReaderOptions {
 
   // Buffer size when deserializing rows into columnar batches. Only used for 
sort-based shuffle.
   int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
+
+  // Whether to enable the reader-side raw payload merge fast path for plain 
hash shuffle payloads within one input
+  // stream.
+  bool enableHashShuffleReaderStreamMerge = false;
 };
 
 struct ShuffleWriterOptions {
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 1e2cc6f308..e3eac17a22 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -585,7 +585,8 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
       options.readerBufferSize,
       options.deserializerBufferSize,
       memoryManager(),
-      options.shuffleWriterType);
+      options.shuffleWriterType,
+      options.enableHashShuffleReaderStreamMerge);
 
   return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
 }
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 2e5f1f625a..a469d5c770 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -471,6 +471,7 @@ 
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
     VeloxMemoryManager* memoryManager,
     std::vector<bool> isValidityBuffer,
     bool hasComplexType,
+    bool enableStreamMerge,
     int64_t& deserializeTime,
     int64_t& decompressTime)
     : streamReader_(streamReader),
@@ -482,12 +483,17 @@ 
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
       memoryManager_(memoryManager),
       isValidityBuffer_(std::move(isValidityBuffer)),
       hasComplexType_(hasComplexType),
+      enableStreamMerge_(enableStreamMerge),
       deserializeTime_(deserializeTime),
       decompressTime_(decompressTime) {}
 
 bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
-  // Complex type or dictionary encodings do not support merging.
-  return hasComplexType_ || !dictionaryFields_.empty();
+  // Stream merge is a reader-side raw payload fast path: for plain payloads it
+  // concatenates buffers before Velox vectors are materialized, avoiding the 
generic
+  // RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and 
dictionary
+  // payloads on the existing per-payload path; VeloxResizeBatchesExec can be 
enabled
+  // separately as the generic complement for those cases.
+  return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty();
 }
 
 bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
@@ -912,7 +918,8 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
     int64_t readerBufferSize,
     int64_t deserializerBufferSize,
     VeloxMemoryManager* memoryManager,
-    ShuffleWriterType shuffleWriterType)
+    ShuffleWriterType shuffleWriterType,
+    bool enableHashShuffleReaderStreamMerge)
     : schema_(schema),
       codec_(codec),
       veloxCompressionType_(veloxCompressionType),
@@ -921,7 +928,8 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
       readerBufferSize_(readerBufferSize),
       deserializerBufferSize_(deserializerBufferSize),
       memoryManager_(memoryManager),
-      shuffleWriterType_(shuffleWriterType) {
+      shuffleWriterType_(shuffleWriterType),
+      enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) {
   initFromSchema();
 }
 
@@ -952,6 +960,7 @@ std::unique_ptr<ColumnarBatchIterator> 
VeloxShuffleReaderDeserializerFactory::cr
           memoryManager_,
           isValidityBuffer_,
           hasComplexType_,
+          enableHashShuffleReaderStreamMerge_,
           deserializeTime_,
           decompressTime_);
     case ShuffleWriterType::kSortShuffle:
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 0b08fe675a..f92f0a2cc3 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -39,6 +39,7 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
       VeloxMemoryManager* memoryManager,
       std::vector<bool> isValidityBuffer,
       bool hasComplexType,
+      bool enableStreamMerge,
       int64_t& deserializeTime,
       int64_t& decompressTime);
 
@@ -60,6 +61,7 @@ class VeloxHashShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
   VeloxMemoryManager* memoryManager_;
   std::vector<bool> isValidityBuffer_;
   bool hasComplexType_;
+  bool enableStreamMerge_;
 
   int64_t& deserializeTime_;
   int64_t& decompressTime_;
@@ -171,7 +173,8 @@ class VeloxShuffleReaderDeserializerFactory {
       int64_t readerBufferSize,
       int64_t deserializerBufferSize,
       VeloxMemoryManager* memoryManager,
-      ShuffleWriterType shuffleWriterType);
+      ShuffleWriterType shuffleWriterType,
+      bool enableHashShuffleReaderStreamMerge = false);
 
   std::unique_ptr<ColumnarBatchIterator> createDeserializer(const 
std::shared_ptr<StreamReader>& streamReader);
 
@@ -195,6 +198,7 @@ class VeloxShuffleReaderDeserializerFactory {
   bool hasComplexType_{false};
 
   ShuffleWriterType shuffleWriterType_;
+  bool enableHashShuffleReaderStreamMerge_;
 
   int64_t deserializeTime_{0};
   int64_t decompressTime_{0};
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index e0d900b2b1..18046629d4 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -19,6 +19,7 @@
 #include <arrow/io/api.h>
 
 #include <cstring>
+#include <optional>
 
 #include "shuffle/Payload.h"
 #include "shuffle/VeloxHashShuffleWriter.h"
@@ -537,20 +538,36 @@ class VeloxShuffleReaderStreamMergeTest : public 
::testing::Test, public VeloxSh
   std::vector<RowVectorPtr> readStreams(
       const RowTypePtr& rowType,
       int32_t batchSize,
-      std::vector<std::shared_ptr<arrow::io::InputStream>> streams) {
+      std::vector<std::shared_ptr<arrow::io::InputStream>> streams,
+      std::optional<bool> enableStreamMerge = std::nullopt) {
     const auto schema = toArrowSchema(rowType, 
getDefaultMemoryManager()->getLeafMemoryPool().get());
     std::shared_ptr<arrow::util::Codec> codec =
         createCompressionCodec(arrow::Compression::UNCOMPRESSED, 
CodecBackend::NONE);
-    auto deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
-        schema,
-        codec,
-        arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
-        rowType,
-        batchSize,
-        kDefaultReadBufferSize,
-        kDefaultDeserializerBufferSize,
-        getDefaultMemoryManager(),
-        ShuffleWriterType::kHashShuffle);
+    std::unique_ptr<VeloxShuffleReaderDeserializerFactory> deserializerFactory;
+    if (enableStreamMerge.has_value()) {
+      deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+          schema,
+          codec,
+          arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+          rowType,
+          batchSize,
+          kDefaultReadBufferSize,
+          kDefaultDeserializerBufferSize,
+          getDefaultMemoryManager(),
+          ShuffleWriterType::kHashShuffle,
+          enableStreamMerge.value());
+    } else {
+      deserializerFactory = 
std::make_unique<VeloxShuffleReaderDeserializerFactory>(
+          schema,
+          codec,
+          arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED),
+          rowType,
+          batchSize,
+          kDefaultReadBufferSize,
+          kDefaultDeserializerBufferSize,
+          getDefaultMemoryManager(),
+          ShuffleWriterType::kHashShuffle);
+    }
 
     auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
     const auto iter = 
reader->read(std::make_shared<MultiStreamReader>(std::move(streams)));
@@ -595,7 +612,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderMergesWithinStream) {
 
   std::vector<std::shared_ptr<arrow::io::InputStream>> streams = 
{writeSinglePartitionStream(inputs)};
 
-  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), 2);
   ASSERT_EQ(output[0]->size(), kBatchSize);
@@ -604,6 +621,22 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderMergesWithinStream) {
   facebook::velox::test::assertEqualVectors(inputs[3], output[1]);
 }
 
+TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeByDefault) {
+  constexpr int32_t kBatchSize = 100;
+  std::vector<RowVectorPtr> inputs = {
+      makeRowVector({makeFlatVector<int32_t>({1, 2})}),
+      makeRowVector({makeFlatVector<int32_t>({3, 4})}),
+      makeRowVector({makeFlatVector<int32_t>({5, 6})})};
+
+  const auto rowType = facebook::velox::asRowType(inputs[0]->type());
+  auto output = readStreams(rowType, kBatchSize, 
{writeSinglePartitionStream(inputs)});
+
+  ASSERT_EQ(output.size(), inputs.size());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    facebook::velox::test::assertEqualVectors(inputs[i], output[i]);
+  }
+}
+
 TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderMergesWithinEachStreamOnly) {
   constexpr int32_t kBatchSize = 100;
   std::vector<RowVectorPtr> inputs = {
@@ -615,7 +648,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderMergesWithinEachStreamOnly)
   std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
       writeSinglePartitionStream({inputs[0], inputs[1]}), 
writeSinglePartitionStream({inputs[2], inputs[3]})};
 
-  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), 2);
   facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], 
inputs[1]}), output[0]);
@@ -636,7 +669,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderDoesNotMergeAcrossStreams) {
     streams.push_back(writeSinglePartitionStream(input));
   }
 
-  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), inputs.size());
   for (size_t i = 0; i < inputs.size(); ++i) {
@@ -661,7 +694,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderCarriesOverPayloadThatWouldE
 
   std::vector<std::shared_ptr<arrow::io::InputStream>> streams = 
{writeSinglePartitionStream(inputs)};
 
-  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), 2);
   facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
@@ -675,7 +708,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderFlushesMergedRowsBeforeDicti
   std::vector<std::shared_ptr<arrow::io::InputStream>> streams = {
       writeSinglePartitionStream(plainInput), 
writeSinglePartitionStream(dictionaryInput, true)};
 
-  auto output = readStreams(facebook::velox::asRowType(plainInput->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(plainInput->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), 2);
   facebook::velox::test::assertEqualVectors(plainInput, output[0]);
@@ -694,7 +727,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, 
hashReaderDoesNotMergeComplexTypeStrea
     streams.push_back(writeSinglePartitionStream(input));
   }
 
-  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams));
+  auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), 
kBatchSize, std::move(streams), true);
 
   ASSERT_EQ(output.size(), inputs.size());
   facebook::velox::test::assertEqualVectors(inputs[0], output[0]);
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index a608dfbc45..a0c0691e2f 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -9,77 +9,78 @@ nav_order: 16
 
 ## Gluten Velox backend configurations
 
-|                                       Key                                    
    |      Default      |                                                       
                                                                                
                                                                               
Description                                                                     
                                                                                
               [...]
-|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| spark.gluten.sql.columnar.backend.velox.IOThreads                            
    | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This 
thread pool is used for split preloading and DirectBufferedInput. By default, 
the value is the same as the maximum task slots per Spark executor.             
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver                
    | 2                 | The split preload per task                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct      
    | 90                | If partial aggregation aggregationPct greater than 
this value, partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                [...]
-| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows     
    | 100000            | If partial aggregation input rows number greater than 
this value,  partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                            [...]
-| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping           
    | 30000ms           | Timeout in milliseconds when waiting for 
runtime-scoped async work to finish during teardown.                            
                                                                                
                                                                                
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.cacheEnabled                         
    | false             | Enable Velox cache, default off. It's recommended to 
enablesoft-affinity as well when enable velox cache.                            
                                                                                
                                                                                
                                                                                
               [...]
-| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct                  
    | 0                 | Set prefetch cache min pct for velox file scan        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.checkUsageLeak                       
    | true              | Enable check memory usage leak.                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.batchSize                       
    | 2147483647        | Cudf input batch size after shuffle reader            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan                 
    | false             | Enable cudf table scan                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation                
    | true              | Heuristics you can apply to validate a cuDF/GPU plan 
and only offload when the entire stage can be fully and profitably executed on 
GPU                                                                             
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent                   
    | 50                | The initial percent of GPU memory to allocate for 
memory resource for one thread.                                                 
                                                                                
                                                                                
                                                                                
                  [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | async             | GPU RMM memory resource.                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 1028MB            | Maximum bytes to prefetch in CPU memory during GPU 
shuffle read while waiting for GPU available.                                   
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                              [...]
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | true              | Enable validation fallback for TimestampNTZ type. 
When true (default), any plan containing TimestampNTZ will fall back to Spark 
execution. Set to false during development/testing of TimestampNTZ support to 
allow native execution.                                                         
                                                                                
                      [...]
-| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled               
    | false             | Disables caching if false. File handle cache should 
be disabled if files are mutable, i.e. file content may change while file path 
stays the same.                                                                 
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold                 
    | 1MB               | Set the file preload threshold for velox file scan, 
refer to Velox's file-preload-threshold                                         
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.floatingPointMode                    
    | loose             | Config used to control the tolerance of floating 
point operations alignment with Spark. When the mode is set to strict, flushing 
is disabled for sum(float/double)and avg(float/double). When set to loose, 
flushing will be enabled.                                                       
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation          
    | true              | Enable flushable aggregation. If true, Gluten will 
try converting regular aggregation into Velox's flushable aggregation when 
applicable. A flushable aggregation could emit intermediate result at anytime 
when memory is full / data reduction ratio is low.                              
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                  
    | 32KB              | Set the footer estimated size for velox file scan, 
refer to Velox's footer-estimated-size                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| 
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize   
 | 0b                | The maximum byte size of Bloom filter that can be 
generated from hash probe. When set to 0, no Bloom filter will be generated. To 
achieve optimal performance, this should not be too larger than the CPU cache 
size on the host.                                                               
                                                                                
                    [...]
-| 
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled 
 | true              | Whether hash probe can generate any dynamic filter 
(including Bloom filter) and push down to upstream operators.                   
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.loadQuantum                          
    | 256MB             | Set the load quantum for velox file scan, recommend 
to use the default value (256MB) for performance consideration. If Velox cache 
is enabled, it can be 8MB at most.                                              
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes                    
    | 64MB              | Set the max coalesced bytes for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance                 
    | 512KB             | Set the max coalesced distance bytes for velox file 
scan                                                                            
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes                   
    | 100               | Controls maximum number of compiled regular 
expression patterns per function instance per thread of execution.              
                                                                                
                                                                                
                                                                                
                        [...]
-| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory  
    | &lt;undefined&gt; | Set the max extended memory of partial aggregation in 
bytes. When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
 Note: this option only works when flushable partial aggregation is enabled. 
Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
        [...]
-| 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
 | 0.15              | Set the max extended memory of partial aggregation as 
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option 
only works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory          
    | &lt;undefined&gt; | Set the max memory of partial aggregation in bytes. 
When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: 
this option only works when flushable partial aggregation is enabled. Ignored 
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. 
                              [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio     
    | 0.1               | Set the max memory of partial aggregation as 
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works 
when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                            [...]
-| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession       
    | 10000             | Maximum number of partitions per a single table 
writer instance.                                                                
                                                                                
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillBytes                        
    | 100G              | The maximum file size of a query                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize                     
    | 1GB               | The maximum size of a single spill file created       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillLevel                        
    | 4                 | The max allowed spilling level with zero being the 
initial spilling level                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows                      
    | 3M                | The maximum row size of a single spill run            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize                    
    | 0b                | The target file size for each output file when 
writing data. 0 means no limit on target file size, and the actual file size 
will be determined by other factors such as max partition number and shuffle 
batch size.                                                                     
                                                                                
                           [...]
-| spark.gluten.sql.columnar.backend.velox.memCacheSize                         
    | 1GB               | The memory cache size                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.memInitCapacity                      
    | 8MB               | The initial memory capacity to reserve for a newly 
created Velox query memory pool.                                                
                                                                                
                                                                                
                                                                                
                 [...]
-| 
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks   
 | true              | Whether to allow memory capacity transfer between memory 
pools from different tasks.                                                     
                                                                                
                                                                                
                                                                                
           [...]
-| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files.                                               
                                                                                
                                                                                
                                                                                
                   [...]
-| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes                
    | 1MB               | The page size in bytes is for compression.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files.                                           
                                                                                
                                                                                
                                                                                
                   [...]
-| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput           
    | true              | If true, combine small columnar batches together 
before sending to shuffle. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                      [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize   
    | &lt;undefined&gt; | The minimum batch size for shuffle. If size of an 
input batch is smaller than the value, it will be combined with other batches 
before sending to shuffle. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to 
true. Default value: 0.25 * <max batch size>                                    
                                                        [...]
-| 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
 | &lt;undefined&gt; | The minimum batch size for shuffle input and output. If 
size of an input batch is smaller than the value, it will be combined with 
other batches before sending to shuffle. The same applies for batches output by 
shuffle read. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
-| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput          
    | false             | If true, combine small columnar batches together 
right after shuffle read. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                       [...]
-| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished          
    | false             | Show velox full task metrics when finished.           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.spillFileSystem                      
    | local             | The filesystem used to store spill data. local: The 
local file system. heap-over-local: Write file to JVM heap if having extra heap 
space. Otherwise write to local file system.                                    
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.columnar.backend.velox.spillStrategy                        
    | auto              | none: Disable spill on Velox backend; auto: Let Spark 
memory manager manage Velox's spilling                                          
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads                    
    | 1                 | The IO threads for cache promoting                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCachePath                         
    | /tmp              | The folder to store the cache files, better on SSD    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheShards                       
    | 1                 | The cache shards                                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCacheSize                         
    | 1GB               | The SSD cache size, will do memory caching only if 
this value = 0                                                                  
                                                                                
                                                                                
                                                                                
                 [...]
-| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes           
    | 0                 | Checkpoint after every 'checkpointIntervalBytes' for 
SSD cache. 0 means no checkpointing.                                            
                                                                                
                                                                                
                                                                                
               [...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled                   
    | false             | If true, checksum write to SSD is enabled.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled   
    | false             | If true, checksum read verification from SSD is 
enabled.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow                    
    | false             | True if copy on write should be disabled.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.ssdODirect                           
    | false             | The O_DIRECT flag for cache writing                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled    
    | false             | Whether to apply dynamic filters pushed down from 
hash probe in the ValueStream (shuffle reader) operator to filter rows before 
they reach the hash join.                                                       
                                                                                
                                                                                
                    [...]
-| spark.gluten.sql.enable.enhancedFeatures                                     
    | true              | Enable some features including iceberg native write 
and other features.                                                             
                                                                                
                                                                                
                                                                                
                [...]
-| spark.gluten.sql.rewrite.castArrayToString                                   
    | true              | When true, rewrite `cast(array as String)` to 
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. 
                                                                                
                                                                                
                                                                                
                      [...]
-| spark.gluten.velox.broadcast.build.targetBytesPerThread                      
    | 32MB              | It is used to calculate the number of hash table 
build threads. Based on our testing across various thresholds (1MB to 128MB), 
we recommend a value of 32MB or 64MB, as these consistently provided the most 
significant performance gains.                                                  
                                                                                
                       [...]
-| spark.gluten.velox.castFromVarcharAddTrimNode                                
    | false             | If true, will add a trim node which has the same 
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.           
                                                                                
                                                                                
                                                                                
                   [...]
+|                                       Key                                    
    |      Default      |                                                       
                                                                                
                                                                                
                                                                          
Description                                                                     
                    [...]
+|----------------------------------------------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| spark.gluten.sql.columnar.backend.velox.IOThreads                            
    | &lt;undefined&gt; | The Size of the IO thread pool in the Connector. This 
thread pool is used for split preloading and DirectBufferedInput. By default, 
the value is the same as the maximum task slots per Spark executor.             
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver                
    | 2                 | The split preload per task                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct      
    | 90                | If partial aggregation aggregationPct greater than 
this value, partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                [...]
+| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows     
    | 100000            | If partial aggregation input rows number greater than 
this value,  partial aggregation may be early abandoned. Note: this option only 
works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                            [...]
+| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping           
    | 30000ms           | Timeout in milliseconds when waiting for 
runtime-scoped async work to finish during teardown.                            
                                                                                
                                                                                
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.cacheEnabled                         
    | false             | Enable Velox cache, default off. It's recommended to 
enablesoft-affinity as well when enable velox cache.                            
                                                                                
                                                                                
                                                                                
               [...]
+| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct                  
    | 0                 | Set prefetch cache min pct for velox file scan        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.checkUsageLeak                       
    | true              | Enable check memory usage leak.                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.batchSize                       
    | 2147483647        | Cudf input batch size after shuffle reader            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan                 
    | false             | Enable cudf table scan                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation                
    | true              | Heuristics you can apply to validate a cuDF/GPU plan 
and only offload when the entire stage can be fully and profitably executed on 
GPU                                                                             
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent                   
    | 50                | The initial percent of GPU memory to allocate for 
memory resource for one thread.                                                 
                                                                                
                                                                                
                                                                                
                  [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | async             | GPU RMM memory resource.                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 1028MB            | Maximum bytes to prefetch in CPU memory during GPU 
shuffle read while waiting for GPU available.                                   
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                              [...]
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | true              | Enable validation fallback for TimestampNTZ type. 
When true (default), any plan containing TimestampNTZ will fall back to Spark 
execution. Set to false during development/testing of TimestampNTZ support to 
allow native execution.                                                         
                                                                                
                      [...]
+| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled               
    | false             | Disables caching if false. File handle cache should 
be disabled if files are mutable, i.e. file content may change while file path 
stays the same.                                                                 
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold                 
    | 1MB               | Set the file preload threshold for velox file scan, 
refer to Velox's file-preload-threshold                                         
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.floatingPointMode                    
    | loose             | Config used to control the tolerance of floating 
point operations alignment with Spark. When the mode is set to strict, flushing 
is disabled for sum(float/double)and avg(float/double). When set to loose, 
flushing will be enabled.                                                       
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation          
    | true              | Enable flushable aggregation. If true, Gluten will 
try converting regular aggregation into Velox's flushable aggregation when 
applicable. A flushable aggregation could emit intermediate result at anytime 
when memory is full / data reduction ratio is low.                              
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                  
    | 32KB              | Set the footer estimated size for velox file scan, 
refer to Velox's footer-estimated-size                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize   
 | 0b                | The maximum byte size of Bloom filter that can be 
generated from hash probe. When set to 0, no Bloom filter will be generated. To 
achieve optimal performance, this should not be too larger than the CPU cache 
size on the host.                                                               
                                                                                
                    [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled 
 | true              | Whether hash probe can generate any dynamic filter 
(including Bloom filter) and push down to upstream operators.                   
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled  
 | false             | Enables a reader-side raw payload merge fast path for 
plain hash shuffle payloads within each shuffle input stream. This path merges 
payload buffers before Velox vectors are materialized, so it has lower 
per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only 
covers plain payloads. Complex types and dictionary-encoded payloads are not 
merged by this path. VeloxRes [...]
+| spark.gluten.sql.columnar.backend.velox.loadQuantum                          
    | 256MB             | Set the load quantum for velox file scan, recommend 
to use the default value (256MB) for performance consideration. If Velox cache 
is enabled, it can be 8MB at most.                                              
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes                    
    | 64MB              | Set the max coalesced bytes for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance                 
    | 512KB             | Set the max coalesced distance bytes for velox file 
scan                                                                            
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes                   
    | 100               | Controls maximum number of compiled regular 
expression patterns per function instance per thread of execution.              
                                                                                
                                                                                
                                                                                
                        [...]
+| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory  
    | &lt;undefined&gt; | Set the max extended memory of partial aggregation in 
bytes. When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio.
 Note: this option only works when flushable partial aggregation is enabled. 
Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
        [...]
+| 
spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio
 | 0.15              | Set the max extended memory of partial aggregation as 
maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option 
only works when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory          
    | &lt;undefined&gt; | Set the max memory of partial aggregation in bytes. 
When this option is set to a value greater than 0, it will override 
spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: 
this option only works when flushable partial aggregation is enabled. Ignored 
when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. 
                              [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio     
    | 0.1               | Set the max memory of partial aggregation as 
maxPartialAggregationMemoryRatio of offheap size. Note: this option only works 
when flushable partial aggregation is enabled. Ignored when 
spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false.      
                                                                                
                                            [...]
+| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession       
    | 10000             | Maximum number of partitions per a single table 
writer instance.                                                                
                                                                                
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillBytes                        
    | 100G              | The maximum file size of a query                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize                     
    | 1GB               | The maximum size of a single spill file created       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillLevel                        
    | 4                 | The max allowed spilling level with zero being the 
initial spilling level                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows                      
    | 3M                | The maximum row size of a single spill run            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize                    
    | 0b                | The target file size for each output file when 
writing data. 0 means no limit on target file size, and the actual file size 
will be determined by other factors such as max partition number and shuffle 
batch size.                                                                     
                                                                                
                           [...]
+| spark.gluten.sql.columnar.backend.velox.memCacheSize                         
    | 1GB               | The memory cache size                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.memInitCapacity                      
    | 8MB               | The initial memory capacity to reserve for a newly 
created Velox query memory pool.                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks   
 | true              | Whether to allow memory capacity transfer between memory 
pools from different tasks.                                                     
                                                                                
                                                                                
                                                                                
           [...]
+| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages                   
    | false             | Use explicit huge pages for Velox memory allocation.  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled                     
    | true              | Enable velox orc scan. If disabled, vanilla spark orc 
scan will be used.                                                              
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames                    
    | true              | Maps table field names to file field names using 
names, not indices for ORC files.                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes                
    | 1MB               | The page size in bytes is for compression.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames                
    | true              | Maps table field names to file field names using 
names, not indices for Parquet files.                                           
                                                                                
                                                                                
                                                                                
                   [...]
+| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups                    
    | 1                 | Set the prefetch row groups for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled                    
    | false             | Enable query tracing flag.                            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs                     
    | 3600000ms         | The max time in ms to wait for memory reclaim.        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput           
    | true              | If true, combine small columnar batches together 
before sending to shuffle. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                      [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize   
    | &lt;undefined&gt; | The minimum batch size for shuffle. If size of an 
input batch is smaller than the value, it will be combined with other batches 
before sending to shuffle. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to 
true. Default value: 0.25 * <max batch size>                                    
                                                        [...]
+| 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize
 | &lt;undefined&gt; | The minimum batch size for shuffle input and output. If 
size of an input batch is smaller than the value, it will be combined with 
other batches before sending to shuffle. The same applies for batches output by 
shuffle read. Only functions when 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or 
spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is s [...]
+| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput          
    | false             | If true, combine small columnar batches together 
right after shuffle read. The default minimum output batch size is equal to 
0.25 * spark.gluten.sql.columnar.maxBatchSize                                   
                                                                                
                                                                                
                       [...]
+| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished          
    | false             | Show velox full task metrics when finished.           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.spillFileSystem                      
    | local             | The filesystem used to store spill data. local: The 
local file system. heap-over-local: Write file to JVM heap if having extra heap 
space. Otherwise write to local file system.                                    
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.columnar.backend.velox.spillStrategy                        
    | auto              | none: Disable spill on Velox backend; auto: Let Spark 
memory manager manage Velox's spilling                                          
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads                    
    | 1                 | The IO threads for cache promoting                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCachePath                         
    | /tmp              | The folder to store the cache files, better on SSD    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheShards                       
    | 1                 | The cache shards                                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCacheSize                         
    | 1GB               | The SSD cache size, will do memory caching only if 
this value = 0                                                                  
                                                                                
                                                                                
                                                                                
                 [...]
+| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes           
    | 0                 | Checkpoint after every 'checkpointIntervalBytes' for 
SSD cache. 0 means no checkpointing.                                            
                                                                                
                                                                                
                                                                                
               [...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled                   
    | false             | If true, checksum write to SSD is enabled.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled   
    | false             | If true, checksum read verification from SSD is 
enabled.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow                    
    | false             | True if copy on write should be disabled.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.ssdODirect                           
    | false             | The O_DIRECT flag for cache writing                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled    
    | false             | Whether to apply dynamic filters pushed down from 
hash probe in the ValueStream (shuffle reader) operator to filter rows before 
they reach the hash join.                                                       
                                                                                
                                                                                
                    [...]
+| spark.gluten.sql.enable.enhancedFeatures                                     
    | true              | Enable some features including iceberg native write 
and other features.                                                             
                                                                                
                                                                                
                                                                                
                [...]
+| spark.gluten.sql.rewrite.castArrayToString                                   
    | true              | When true, rewrite `cast(array as String)` to 
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. 
                                                                                
                                                                                
                                                                                
                      [...]
+| spark.gluten.velox.broadcast.build.targetBytesPerThread                      
    | 32MB              | It is used to calculate the number of hash table 
build threads. Based on our testing across various thresholds (1MB to 128MB), 
we recommend a value of 32MB or 64MB, as these consistently provided the most 
significant performance gains.                                                  
                                                                                
                       [...]
+| spark.gluten.velox.castFromVarcharAddTrimNode                                
    | false             | If true, will add a trim node which has the same 
semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.           
                                                                                
                                                                                
                                                                                
                   [...]
 
 ## Gluten Velox backend *experimental* configurations
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index 6a0f2130d7..449bc86558 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -42,7 +42,8 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
       int batchSize,
       long readerBufferSize,
       long deserializerBufferSize,
-      String shuffleWriterType);
+      String shuffleWriterType,
+      boolean enableHashShuffleReaderStreamMerge);
 
   public native long read(long shuffleReaderHandle, ShuffleStreamReader 
streamReader);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to