This is an automated email from the ASF dual-hosted git repository.

zjuwangg pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fda85d13d [VL] Add default compress for VeloxColumnarBatchSerializer 
to reduce memory usage (#12373)
4fda85d13d is described below

commit 4fda85d13d4897ae39fc8f2df4b517bb18940046
Author: Hao Chen <[email protected]>
AuthorDate: Tue Jun 30 18:48:38 2026 +0800

    [VL] Add default compress for VeloxColumnarBatchSerializer to reduce memory 
usage (#12373)
---
 .../org/apache/gluten/config/VeloxConfig.scala     |  9 ++++++
 .../gluten/execution/VeloxHashJoinSuite.scala      | 36 ++++++++++++++++++++++
 cpp/velox/compute/VeloxRuntime.cc                  |  4 ++-
 cpp/velox/config/VeloxConfig.h                     |  5 +++
 .../serializer/VeloxColumnarBatchSerializer.cc     |  7 +++--
 .../serializer/VeloxColumnarBatchSerializer.h      |  3 +-
 .../org/apache/gluten/config/GlutenConfig.scala    |  3 +-
 7 files changed, 61 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 1f87ec000a..dcb79a462f 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -631,6 +631,15 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression")
+      .internal()
+      .doc("which compression for the columnar batch serializer (e.g. 
broadcast).")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("none", "zstd", "zlib", "snappy", "lz4", "gzip"))
+      .createWithDefault("none")
+
   val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS =
     buildConf("spark.gluten.velox.abandonDedupHashMap.minRows")
       .experimental()
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index a83e0ebf0e..c80950aa1f 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -445,4 +445,40 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
         }
     }
   }
+
+  test("test columnarBatchSerializerCompression") {
+    Seq("none", "zstd", "zlib", "snappy", "lz4", "gzip").foreach(
+      compression =>
+        withSQLConf(
+          GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key -> "16",
+          VeloxConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key -> "true",
+          VeloxConfig.COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION.key -> 
compression
+        ) {
+          withTable("t1", "t2") {
+            spark.sql("""
+                        |CREATE TABLE t1 USING PARQUET
+                        |AS SELECT id as c1, id as c2 FROM range(10)
+                        |""".stripMargin)
+
+            spark.sql("""
+                        |CREATE TABLE t2 USING PARQUET PARTITIONED BY (c1)
+                        |AS SELECT id as c1, id as c2 FROM range(30)
+                        |""".stripMargin)
+
+            val df = spark.sql("""
+                                 |SELECT t1.c2
+                                 |FROM t1, t2
+                                 |WHERE t1.c1 = t2.c1
+                                 |AND t1.c2 < 4
+                                 |""".stripMargin)
+
+            checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)
+
+            val subqueryBroadcastExecs = 
collectWithSubqueries(df.queryExecution.executedPlan) {
+              case subqueryBroadcast: ColumnarSubqueryBroadcastExec => 
subqueryBroadcast
+            }
+            assert(subqueryBroadcastExecs.size == 1)
+          }
+        })
+  }
 }
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index f13430bd0c..237048553a 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -629,7 +629,9 @@ std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerial
     return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool, 
veloxPool, cSchema);
   }
 #endif
-  return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, 
cSchema);
+  auto compressionKind =
+      veloxCfg_->get<std::string>(kColumnarBatchSerializerCompression, 
kColumnarBatchSerializerCompressionDefault);
+  return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, 
cSchema, compressionKind);
 }
 
 void VeloxRuntime::enableDumping() {
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index d88c436193..ad51066f40 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -50,6 +50,11 @@ const std::string kSparkShuffleSpillCompress = 
"spark.shuffle.spill.compress";
 const std::string kCompressionKind = "spark.io.compression.codec";
 /// The compression codec to use for spilling. Use kCompressionKind if not set.
 const std::string kSpillCompressionKind = 
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
+
+// Which compression kind to use for the columnar batch serializer (e.g. 
broadcast).
+const std::string kColumnarBatchSerializerCompression =
+    
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression";
+const std::string kColumnarBatchSerializerCompressionDefault = "none";
 const std::string kMaxPartialAggregationMemoryRatio =
     "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
 const std::string kMaxPartialAggregationMemory = 
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index 4dbda85a8b..cc1cd6bb4e 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -27,6 +27,7 @@
 
 #include "memory/ArrowMemory.h"
 #include "memory/VeloxColumnarBatch.h"
+#include "velox/common/compression/Compression.h"
 #include "velox/common/memory/Memory.h"
 #include "velox/vector/FlatVector.h"
 #include "velox/vector/LazyVector.h"
@@ -51,7 +52,8 @@ std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, 
int32_t size) {
 VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
     arrow::MemoryPool* arrowPool,
     std::shared_ptr<memory::MemoryPool> veloxPool,
-    struct ArrowSchema* cSchema)
+    struct ArrowSchema* cSchema,
+    const std::string& compressionKind)
     : ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)) {
   // serializeColumnarBatches don't need rowType_
   if (cSchema != nullptr) {
@@ -61,8 +63,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
   arena_ = std::make_unique<StreamArena>(veloxPool_.get());
   serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
   options_.useLosslessTimestamp = true;
-  // Required by serializeSingleColumn / deserializeSingleColumn APIs 
(VELOX_USER_CHECK_EQ).
-  options_.compressionKind = common::CompressionKind::CompressionKind_NONE;
+  options_.compressionKind = 
facebook::velox::common::stringToCompressionKind(compressionKind);
   options_.nullsFirst = false;
 }
 
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 33ff1301b6..4e4a3d4519 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -43,7 +43,8 @@ class VeloxColumnarBatchSerializer : public 
ColumnarBatchSerializer {
   VeloxColumnarBatchSerializer(
       arrow::MemoryPool* arrowPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      struct ArrowSchema* cSchema);
+      struct ArrowSchema* cSchema,
+      const std::string& compressionKind = "none");
 
   void append(const std::shared_ptr<ColumnarBatch>& batch) override;
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 01dfb912a6..37d8b2b3ab 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -528,7 +528,8 @@ object GlutenConfig extends ConfigRegistry {
     "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
     
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
     "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes",
-    "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
+    "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan",
+    
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression"
   )
 
   /** Get dynamic configs. */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to