This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 89a77b6f7d [GLUTEN-11888] [VL] Parallel build hash table to improve 
bhj performance (#11889)
89a77b6f7d is described below

commit 89a77b6f7d0e78c7947703eb1fae98381341e9bc
Author: JiaKe <[email protected]>
AuthorDate: Thu Apr 9 14:00:02 2026 +0100

    [GLUTEN-11888] [VL] Parallel build hash table to improve bhj performance 
(#11889)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  3 +-
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  8 ++++--
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 16 +++++++++--
 .../org/apache/gluten/config/VeloxConfig.scala     | 18 ++++++------
 .../gluten/execution/HashJoinExecTransformer.scala |  5 ++--
 .../sql/execution/ColumnarBuildSideRelation.scala  | 17 ++++++++---
 .../unsafe/UnsafeColumnarBuildSideRelation.scala   | 23 +++++++++++----
 cpp/velox/jni/VeloxJniWrapper.cc                   | 33 ++++++++++------------
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  3 +-
 .../execution/ColumnarBroadcastExchangeExec.scala  |  3 +-
 10 files changed, 84 insertions(+), 45 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index cdf2eae418..832932f0a4 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -480,7 +480,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       mode: BroadcastMode,
       child: SparkPlan,
       numOutputRows: SQLMetric,
-      dataSize: SQLMetric): BuildSideRelation = {
+      dataSize: SQLMetric,
+      buildThreads: SQLMetric): BuildSideRelation = {
 
     val (buildKeys, isNullAware) = mode match {
       case mode1: HashedRelationBroadcastMode =>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index f13217442f..caa2e7de4b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -554,7 +554,8 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
       "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
collect"),
-      "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
broadcast")
+      "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
broadcast"),
+      "buildThreads" -> SQLMetrics.createMetric(sparkContext, "build threads")
     )
 
   override def genColumnarSubqueryBroadcastMetrics(
@@ -667,7 +668,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
         sparkContext,
-        "time of loading lazy vectors")
+        "time of loading lazy vectors"),
+      "buildHashTableTime" -> SQLMetrics.createTimingMetric(
+        sparkContext,
+        "time to build hash table")
     )
 
   override def genHashJoinTransformerMetricsUpdater(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f885bca2eb..863195a6b4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -706,7 +706,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       mode: BroadcastMode,
       child: SparkPlan,
       numOutputRows: SQLMetric,
-      dataSize: SQLMetric): BuildSideRelation = {
+      dataSize: SQLMetric,
+      buildThreads: SQLMetric): BuildSideRelation = {
 
     val buildKeys = mode match {
       case mode1: HashedRelationBroadcastMode =>
@@ -851,6 +852,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     numOutputRows += serialized.map(_.numRows).sum
     dataSize += rawSize
 
+    val rawThreads =
+      math
+        .ceil(dataSize.value.toDouble / 
VeloxConfig.get.veloxBroadcastHashTableBuildTargetBytes)
+        .toInt
+    val buildThreadsValue = if (rawThreads < 1) 1 else rawThreads
+    buildThreads += buildThreadsValue
+
     if (useOffheapBroadcastBuildRelation) {
       TaskResources.runUnsafe {
         UnsafeColumnarBuildSideRelation(
@@ -858,7 +866,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
           serialized.flatMap(_.offHeapData().asScala),
           mode,
           newBuildKeys,
-          offload)
+          offload,
+          buildThreadsValue)
       }
     } else {
       ColumnarBuildSideRelation(
@@ -866,7 +875,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         serialized.flatMap(_.onHeapData().asScala).toArray,
         mode,
         newBuildKeys,
-        offload)
+        offload,
+        buildThreadsValue)
     }
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 8c39f02128..f34b3b6e62 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -62,8 +62,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
   def enableBroadcastBuildOncePerExecutor: Boolean =
     getConf(VELOX_BROADCAST_BUILD_HASHTABLE_ONCE_PER_EXECUTOR)
 
-  def veloxBroadcastHashTableBuildThreads: Int =
-    getConf(COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_THREADS)
+  def veloxBroadcastHashTableBuildTargetBytes: Long =
+    getConf(COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_TARGET_BYTES)
 
   def veloxOrcScanEnabled: Boolean =
     getConf(VELOX_ORC_SCAN_ENABLED)
@@ -206,12 +206,14 @@ object VeloxConfig extends ConfigRegistry {
       .intConf
       .createOptional
 
-  val COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_THREADS =
-    
buildStaticConf("spark.gluten.sql.columnar.backend.velox.broadcastHashTableBuildThreads")
-      .doc("The number of threads used to build the broadcast hash table. " +
-        "If not set or set to 0, it will use the default number of threads 
(available processors).")
-      .intConf
-      .createWithDefault(1)
+  val COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_TARGET_BYTES =
+    buildStaticConf("spark.gluten.velox.broadcast.build.targetBytesPerThread")
+      .doc(
+        "It is used to calculate the number of hash table build threads. Based 
on our testing" +
+          " across various thresholds (1MB to 128MB), we recommend a value of 
32MB or 64MB," +
+          " as these consistently provided the most significant performance 
gains.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("32MB")
 
   val COLUMNAR_VELOX_ASYNC_TIMEOUT =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
index fb57848e54..ba014134d0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import io.substrait.proto.JoinRel
@@ -158,7 +159,7 @@ case class BroadcastHashJoinExecTransformer(
         buildBroadcastTableId,
         isNullAwareAntiJoin,
         bloomFilterPushdownSize,
-        VeloxConfig.get.veloxBroadcastHashTableBuildThreads
+        metrics.get("buildHashTableTime")
       )
     val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast, 
context)
     // FIXME: Do we have to make build side a RDD?
@@ -176,4 +177,4 @@ case class BroadcastHashJoinContext(
     buildHashTableId: String,
     isNullAwareAntiJoin: Boolean = false,
     bloomFilterPushdownSize: Long,
-    broadcastHashTableBuildThreads: Int)
+    buildHashTableTimeMetric: Option[SQLMetric] = None)
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index b106319e81..1b21ef7b42 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -51,7 +51,8 @@ object ColumnarBuildSideRelation {
       batches: Array[Array[Byte]],
       mode: BroadcastMode,
       newBuildKeys: Seq[Expression] = Seq.empty,
-      offload: Boolean = false): ColumnarBuildSideRelation = {
+      offload: Boolean = false,
+      buildThreads: Int = 1): ColumnarBuildSideRelation = {
     val boundMode = mode match {
       case HashedRelationBroadcastMode(keys, isNullAware) =>
         // Bind each key to the build-side output so simple cols become 
BoundReference
@@ -66,7 +67,8 @@ object ColumnarBuildSideRelation {
       batches,
       BroadcastModeUtils.toSafe(boundMode),
       newBuildKeys,
-      offload)
+      offload,
+      buildThreads)
   }
 }
 
@@ -75,7 +77,8 @@ case class ColumnarBuildSideRelation(
     batches: Array[Array[Byte]],
     safeBroadcastMode: SafeBroadcastMode,
     newBuildKeys: Seq[Expression],
-    offload: Boolean)
+    offload: Boolean,
+    buildThreads: Int)
   extends BuildSideRelation
   with Logging
   with KnownSizeEstimation {
@@ -156,6 +159,7 @@ case class ColumnarBuildSideRelation(
       broadcastContext: BroadcastHashJoinContext): (Long, 
ColumnarBuildSideRelation) =
     synchronized {
       if (hashTableData == 0) {
+        val startTime = System.nanoTime()
         val runtime = Runtimes.contextInstance(
           BackendsApiManager.getBackendName,
           "ColumnarBuildSideRelation#buildHashTable")
@@ -215,10 +219,15 @@ case class ColumnarBuildSideRelation(
             SubstraitUtil.toNameStruct(newOutput).toByteArray,
             broadcastContext.isNullAwareAntiJoin,
             broadcastContext.bloomFilterPushdownSize,
-            broadcastContext.broadcastHashTableBuildThreads
+            buildThreads
           )
 
         jniWrapper.close(serializeHandle)
+
+        // Update build hash table time metric
+        val elapsedTime = System.nanoTime() - startTime
+        broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 
1000000)
+
         (hashTableData, this)
       } else {
         (HashJoinBuilder.cloneHashTable(hashTableData), null)
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
index 01fbb86bee..26195d48af 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
@@ -56,7 +56,8 @@ object UnsafeColumnarBuildSideRelation {
       batches: Seq[UnsafeByteArray],
       mode: BroadcastMode,
       newBuildKeys: Seq[Expression] = Seq.empty,
-      offload: Boolean = false): UnsafeColumnarBuildSideRelation = {
+      offload: Boolean = false,
+      buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = {
     val boundMode = mode match {
       case HashedRelationBroadcastMode(keys, isNullAware) =>
         // Bind each key to the build-side output so simple cols become 
BoundReference
@@ -71,7 +72,8 @@ object UnsafeColumnarBuildSideRelation {
       batches,
       BroadcastModeUtils.toSafe(boundMode),
       newBuildKeys,
-      offload)
+      offload,
+      buildThreads)
   }
 }
 
@@ -91,7 +93,8 @@ class UnsafeColumnarBuildSideRelation(
     private var batches: Seq[UnsafeByteArray],
     private var safeBroadcastMode: SafeBroadcastMode,
     private var newBuildKeys: Seq[Expression],
-    private var offload: Boolean)
+    private var offload: Boolean,
+    private var buildThreads: Int)
   extends BuildSideRelation
   with Externalizable
   with Logging
@@ -113,7 +116,7 @@ class UnsafeColumnarBuildSideRelation(
 
   /** needed for serialization. */
   def this() = {
-    this(null, null, null, Seq.empty, false)
+    this(null, null, null, Seq.empty, false, 1)
   }
 
   private[unsafe] def getBatches(): Seq[UnsafeByteArray] = {
@@ -125,6 +128,7 @@ class UnsafeColumnarBuildSideRelation(
   def buildHashTable(broadcastContext: BroadcastHashJoinContext): (Long, 
BuildSideRelation) =
     synchronized {
       if (hashTableData == 0) {
+        val startTime = System.nanoTime()
         val runtime = Runtimes.contextInstance(
           BackendsApiManager.getBackendName,
           "UnsafeColumnarBuildSideRelation#buildHashTable")
@@ -185,10 +189,15 @@ class UnsafeColumnarBuildSideRelation(
             SubstraitUtil.toNameStruct(newOutput).toByteArray,
             broadcastContext.isNullAwareAntiJoin,
             broadcastContext.bloomFilterPushdownSize,
-            broadcastContext.broadcastHashTableBuildThreads
+            buildThreads
           )
 
         jniWrapper.close(serializeHandle)
+
+        // Update build hash table time metric
+        val elapsedTime = System.nanoTime() - startTime
+        broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 
1000000)
+
         (hashTableData, this)
       } else {
         (HashJoinBuilder.cloneHashTable(hashTableData), null)
@@ -205,6 +214,7 @@ class UnsafeColumnarBuildSideRelation(
     out.writeObject(batches.toArray)
     out.writeObject(newBuildKeys)
     out.writeBoolean(offload)
+    out.writeInt(buildThreads)
   }
 
   override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
@@ -213,6 +223,7 @@ class UnsafeColumnarBuildSideRelation(
     kryo.writeClassAndObject(out, batches.toArray)
     kryo.writeClassAndObject(out, newBuildKeys)
     out.writeBoolean(offload)
+    out.writeInt(buildThreads)
   }
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -221,6 +232,7 @@ class UnsafeColumnarBuildSideRelation(
     batches = in.readObject().asInstanceOf[Array[UnsafeByteArray]].toSeq
     newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]]
     offload = in.readBoolean()
+    buildThreads = in.readInt()
   }
 
   override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
@@ -229,6 +241,7 @@ class UnsafeColumnarBuildSideRelation(
     batches = 
kryo.readClassAndObject(in).asInstanceOf[Array[UnsafeByteArray]].toSeq
     newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]]
     offload = in.readBoolean()
+    buildThreads = in.readInt()
   }
 
   private def transformProjection: UnsafeProjection = safeBroadcastMode match {
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 4705a646e2..4f8c307ada 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -21,6 +21,8 @@
 #include <jni/JniCommon.h>
 #include <velox/connectors/hive/PartitionIdGenerator.h>
 #include <velox/exec/OperatorUtils.h>
+#include <folly/futures/Future.h>
+#include <folly/executors/CPUThreadPoolExecutor.h>
 
 #include <exception>
 #include "JniUdf.h"
@@ -946,7 +948,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
     jbyteArray namedStruct,
     jboolean isNullAwareAntiJoin,
     jlong bloomFilterPushdownSize,
-    jint broadcastHashTableBuildThreads) {
+    jint numThreads) {
   JNI_METHOD_START
   const auto hashTableId = jStringToCString(env, tableId);
 
@@ -985,17 +987,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
     cb.push_back(ObjectStore::retrieve<ColumnarBatch>(handle));
   }
 
-  size_t maxThreads = broadcastHashTableBuildThreads > 0
-      ? std::min((size_t)broadcastHashTableBuildThreads, (size_t)32)
-      : std::min((size_t)std::thread::hardware_concurrency(), (size_t)32);
-
-  // Heuristic: Each thread should process at least a certain number of 
batches to justify parallelism overhead.
-  // 32 batches is roughly 128k rows, which is a reasonable granularity for a 
single thread.
-  constexpr size_t kMinBatchesPerThread = 32;
-  size_t numThreads = std::min(maxThreads, (handleCount + kMinBatchesPerThread 
- 1) / kMinBatchesPerThread);
-  numThreads = std::max((size_t)1, numThreads);
-
-  if (numThreads <= 1) {
+  if (numThreads == 1) {
     auto builder = nativeHashTableBuild(
         hashJoinKeys,
         names,
@@ -1020,16 +1012,20 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
     return gluten::getHashTableObjStore()->save(builder);
   }
 
-  std::vector<std::thread> threads;
-
+  // Use thread pool (executor) instead of creating threads directly
+  auto executor = VeloxBackend::get()->executor();
+  
   std::vector<std::shared_ptr<gluten::HashTableBuilder>> 
hashTableBuilders(numThreads);
   std::vector<std::unique_ptr<facebook::velox::exec::BaseHashTable>> 
otherTables(numThreads);
+  std::vector<folly::Future<folly::Unit>> futures;
+  futures.reserve(numThreads);
 
   for (size_t t = 0; t < numThreads; ++t) {
     size_t start = (handleCount * t) / numThreads;
     size_t end = (handleCount * (t + 1)) / numThreads;
 
-    threads.emplace_back([&, t, start, end]() {
+    // Submit task to thread pool
+    auto future = folly::via(executor, [&, t, start, end]() {
       std::vector<std::shared_ptr<gluten::ColumnarBatch>> threadBatches;
       for (size_t i = start; i < end; ++i) {
         threadBatches.push_back(cb[i]);
@@ -1050,11 +1046,12 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
       hashTableBuilders[t] = std::move(builder);
       otherTables[t] = std::move(hashTableBuilders[t]->uniqueTable());
     });
+    
+    futures.push_back(std::move(future));
   }
 
-  for (auto& thread : threads) {
-    thread.join();
-  }
+  // Wait for all tasks to complete
+  folly::collectAll(futures).wait();
 
   auto mainTable = std::move(otherTables[0]);
   std::vector<std::unique_ptr<facebook::velox::exec::BaseHashTable>> tables;
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 73f2b57b9d..ce0a79f0bc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -435,7 +435,8 @@ trait SparkPlanExecApi {
       mode: BroadcastMode,
       child: SparkPlan,
       numOutputRows: SQLMetric,
-      dataSize: SQLMetric): BuildSideRelation
+      dataSize: SQLMetric,
+      buildThreads: SQLMetric = null): BuildSideRelation
 
   def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
     mode.canonicalized
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 26a4ef3919..4b2890d51d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -75,7 +75,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, 
child: SparkPlan)
               mode,
               child,
               longMetric("numOutputRows"),
-              longMetric("dataSize"))
+              longMetric("dataSize"),
+              metrics.getOrElse("buildThreads", null))
         }
 
         val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) 
{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to