This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 89a77b6f7d [GLUTEN-11888] [VL] Parallel build hash table to improve
bhj performance (#11889)
89a77b6f7d is described below
commit 89a77b6f7d0e78c7947703eb1fae98381341e9bc
Author: JiaKe <[email protected]>
AuthorDate: Thu Apr 9 14:00:02 2026 +0100
[GLUTEN-11888] [VL] Parallel build hash table to improve bhj performance
(#11889)
---
.../clickhouse/CHSparkPlanExecApi.scala | 3 +-
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 8 ++++--
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 16 +++++++++--
.../org/apache/gluten/config/VeloxConfig.scala | 18 ++++++------
.../gluten/execution/HashJoinExecTransformer.scala | 5 ++--
.../sql/execution/ColumnarBuildSideRelation.scala | 17 ++++++++---
.../unsafe/UnsafeColumnarBuildSideRelation.scala | 23 +++++++++++----
cpp/velox/jni/VeloxJniWrapper.cc | 33 ++++++++++------------
.../gluten/backendsapi/SparkPlanExecApi.scala | 3 +-
.../execution/ColumnarBroadcastExchangeExec.scala | 3 +-
10 files changed, 84 insertions(+), 45 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index cdf2eae418..832932f0a4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -480,7 +480,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
- dataSize: SQLMetric): BuildSideRelation = {
+ dataSize: SQLMetric,
+ buildThreads: SQLMetric): BuildSideRelation = {
val (buildKeys, isNullAware) = mode match {
case mode1: HashedRelationBroadcastMode =>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index f13217442f..caa2e7de4b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -554,7 +554,8 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
collect"),
- "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
broadcast")
+ "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
broadcast"),
+ "buildThreads" -> SQLMetrics.createMetric(sparkContext, "build threads")
)
override def genColumnarSubqueryBroadcastMetrics(
@@ -667,7 +668,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of
output bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
- "time of loading lazy vectors")
+ "time of loading lazy vectors"),
+ "buildHashTableTime" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "time to build hash table")
)
override def genHashJoinTransformerMetricsUpdater(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f885bca2eb..863195a6b4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -706,7 +706,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
- dataSize: SQLMetric): BuildSideRelation = {
+ dataSize: SQLMetric,
+ buildThreads: SQLMetric): BuildSideRelation = {
val buildKeys = mode match {
case mode1: HashedRelationBroadcastMode =>
@@ -851,6 +852,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
numOutputRows += serialized.map(_.numRows).sum
dataSize += rawSize
+ val rawThreads =
+ math
+ .ceil(dataSize.value.toDouble /
VeloxConfig.get.veloxBroadcastHashTableBuildTargetBytes)
+ .toInt
+ val buildThreadsValue = if (rawThreads < 1) 1 else rawThreads
+ buildThreads += buildThreadsValue
+
if (useOffheapBroadcastBuildRelation) {
TaskResources.runUnsafe {
UnsafeColumnarBuildSideRelation(
@@ -858,7 +866,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
serialized.flatMap(_.offHeapData().asScala),
mode,
newBuildKeys,
- offload)
+ offload,
+ buildThreadsValue)
}
} else {
ColumnarBuildSideRelation(
@@ -866,7 +875,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
serialized.flatMap(_.onHeapData().asScala).toArray,
mode,
newBuildKeys,
- offload)
+ offload,
+ buildThreadsValue)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 8c39f02128..f34b3b6e62 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -62,8 +62,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enableBroadcastBuildOncePerExecutor: Boolean =
getConf(VELOX_BROADCAST_BUILD_HASHTABLE_ONCE_PER_EXECUTOR)
- def veloxBroadcastHashTableBuildThreads: Int =
- getConf(COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_THREADS)
+ def veloxBroadcastHashTableBuildTargetBytes: Long =
+ getConf(COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_TARGET_BYTES)
def veloxOrcScanEnabled: Boolean =
getConf(VELOX_ORC_SCAN_ENABLED)
@@ -206,12 +206,14 @@ object VeloxConfig extends ConfigRegistry {
.intConf
.createOptional
- val COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_THREADS =
-
buildStaticConf("spark.gluten.sql.columnar.backend.velox.broadcastHashTableBuildThreads")
- .doc("The number of threads used to build the broadcast hash table. " +
- "If not set or set to 0, it will use the default number of threads
(available processors).")
- .intConf
- .createWithDefault(1)
+ val COLUMNAR_VELOX_BROADCAST_HASH_TABLE_BUILD_TARGET_BYTES =
+ buildStaticConf("spark.gluten.velox.broadcast.build.targetBytesPerThread")
+ .doc(
+ "It is used to calculate the number of hash table build threads. Based
on our testing" +
+ " across various thresholds (1MB to 128MB), we recommend a value of
32MB or 64MB," +
+ " as these consistently provided the most significant performance
gains.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("32MB")
val COLUMNAR_VELOX_ASYNC_TIMEOUT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
index fb57848e54..ba014134d0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
import io.substrait.proto.JoinRel
@@ -158,7 +159,7 @@ case class BroadcastHashJoinExecTransformer(
buildBroadcastTableId,
isNullAwareAntiJoin,
bloomFilterPushdownSize,
- VeloxConfig.get.veloxBroadcastHashTableBuildThreads
+ metrics.get("buildHashTableTime")
)
val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast,
context)
// FIXME: Do we have to make build side a RDD?
@@ -176,4 +177,4 @@ case class BroadcastHashJoinContext(
buildHashTableId: String,
isNullAwareAntiJoin: Boolean = false,
bloomFilterPushdownSize: Long,
- broadcastHashTableBuildThreads: Int)
+ buildHashTableTimeMetric: Option[SQLMetric] = None)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index b106319e81..1b21ef7b42 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -51,7 +51,8 @@ object ColumnarBuildSideRelation {
batches: Array[Array[Byte]],
mode: BroadcastMode,
newBuildKeys: Seq[Expression] = Seq.empty,
- offload: Boolean = false): ColumnarBuildSideRelation = {
+ offload: Boolean = false,
+ buildThreads: Int = 1): ColumnarBuildSideRelation = {
val boundMode = mode match {
case HashedRelationBroadcastMode(keys, isNullAware) =>
// Bind each key to the build-side output so simple cols become
BoundReference
@@ -66,7 +67,8 @@ object ColumnarBuildSideRelation {
batches,
BroadcastModeUtils.toSafe(boundMode),
newBuildKeys,
- offload)
+ offload,
+ buildThreads)
}
}
@@ -75,7 +77,8 @@ case class ColumnarBuildSideRelation(
batches: Array[Array[Byte]],
safeBroadcastMode: SafeBroadcastMode,
newBuildKeys: Seq[Expression],
- offload: Boolean)
+ offload: Boolean,
+ buildThreads: Int)
extends BuildSideRelation
with Logging
with KnownSizeEstimation {
@@ -156,6 +159,7 @@ case class ColumnarBuildSideRelation(
broadcastContext: BroadcastHashJoinContext): (Long,
ColumnarBuildSideRelation) =
synchronized {
if (hashTableData == 0) {
+ val startTime = System.nanoTime()
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"ColumnarBuildSideRelation#buildHashTable")
@@ -215,10 +219,15 @@ case class ColumnarBuildSideRelation(
SubstraitUtil.toNameStruct(newOutput).toByteArray,
broadcastContext.isNullAwareAntiJoin,
broadcastContext.bloomFilterPushdownSize,
- broadcastContext.broadcastHashTableBuildThreads
+ buildThreads
)
jniWrapper.close(serializeHandle)
+
+ // Update build hash table time metric
+ val elapsedTime = System.nanoTime() - startTime
+ broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime /
1000000)
+
(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
index 01fbb86bee..26195d48af 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
@@ -56,7 +56,8 @@ object UnsafeColumnarBuildSideRelation {
batches: Seq[UnsafeByteArray],
mode: BroadcastMode,
newBuildKeys: Seq[Expression] = Seq.empty,
- offload: Boolean = false): UnsafeColumnarBuildSideRelation = {
+ offload: Boolean = false,
+ buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = {
val boundMode = mode match {
case HashedRelationBroadcastMode(keys, isNullAware) =>
// Bind each key to the build-side output so simple cols become
BoundReference
@@ -71,7 +72,8 @@ object UnsafeColumnarBuildSideRelation {
batches,
BroadcastModeUtils.toSafe(boundMode),
newBuildKeys,
- offload)
+ offload,
+ buildThreads)
}
}
@@ -91,7 +93,8 @@ class UnsafeColumnarBuildSideRelation(
private var batches: Seq[UnsafeByteArray],
private var safeBroadcastMode: SafeBroadcastMode,
private var newBuildKeys: Seq[Expression],
- private var offload: Boolean)
+ private var offload: Boolean,
+ private var buildThreads: Int)
extends BuildSideRelation
with Externalizable
with Logging
@@ -113,7 +116,7 @@ class UnsafeColumnarBuildSideRelation(
/** needed for serialization. */
def this() = {
- this(null, null, null, Seq.empty, false)
+ this(null, null, null, Seq.empty, false, 1)
}
private[unsafe] def getBatches(): Seq[UnsafeByteArray] = {
@@ -125,6 +128,7 @@ class UnsafeColumnarBuildSideRelation(
def buildHashTable(broadcastContext: BroadcastHashJoinContext): (Long,
BuildSideRelation) =
synchronized {
if (hashTableData == 0) {
+ val startTime = System.nanoTime()
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"UnsafeColumnarBuildSideRelation#buildHashTable")
@@ -185,10 +189,15 @@ class UnsafeColumnarBuildSideRelation(
SubstraitUtil.toNameStruct(newOutput).toByteArray,
broadcastContext.isNullAwareAntiJoin,
broadcastContext.bloomFilterPushdownSize,
- broadcastContext.broadcastHashTableBuildThreads
+ buildThreads
)
jniWrapper.close(serializeHandle)
+
+ // Update build hash table time metric
+ val elapsedTime = System.nanoTime() - startTime
+ broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime /
1000000)
+
(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
@@ -205,6 +214,7 @@ class UnsafeColumnarBuildSideRelation(
out.writeObject(batches.toArray)
out.writeObject(newBuildKeys)
out.writeBoolean(offload)
+ out.writeInt(buildThreads)
}
override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
@@ -213,6 +223,7 @@ class UnsafeColumnarBuildSideRelation(
kryo.writeClassAndObject(out, batches.toArray)
kryo.writeClassAndObject(out, newBuildKeys)
out.writeBoolean(offload)
+ out.writeInt(buildThreads)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -221,6 +232,7 @@ class UnsafeColumnarBuildSideRelation(
batches = in.readObject().asInstanceOf[Array[UnsafeByteArray]].toSeq
newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]]
offload = in.readBoolean()
+ buildThreads = in.readInt()
}
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
@@ -229,6 +241,7 @@ class UnsafeColumnarBuildSideRelation(
batches =
kryo.readClassAndObject(in).asInstanceOf[Array[UnsafeByteArray]].toSeq
newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]]
offload = in.readBoolean()
+ buildThreads = in.readInt()
}
private def transformProjection: UnsafeProjection = safeBroadcastMode match {
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 4705a646e2..4f8c307ada 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -21,6 +21,8 @@
#include <jni/JniCommon.h>
#include <velox/connectors/hive/PartitionIdGenerator.h>
#include <velox/exec/OperatorUtils.h>
+#include <folly/futures/Future.h>
+#include <folly/executors/CPUThreadPoolExecutor.h>
#include <exception>
#include "JniUdf.h"
@@ -946,7 +948,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
jbyteArray namedStruct,
jboolean isNullAwareAntiJoin,
jlong bloomFilterPushdownSize,
- jint broadcastHashTableBuildThreads) {
+ jint numThreads) {
JNI_METHOD_START
const auto hashTableId = jStringToCString(env, tableId);
@@ -985,17 +987,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
cb.push_back(ObjectStore::retrieve<ColumnarBatch>(handle));
}
- size_t maxThreads = broadcastHashTableBuildThreads > 0
- ? std::min((size_t)broadcastHashTableBuildThreads, (size_t)32)
- : std::min((size_t)std::thread::hardware_concurrency(), (size_t)32);
-
- // Heuristic: Each thread should process at least a certain number of
batches to justify parallelism overhead.
- // 32 batches is roughly 128k rows, which is a reasonable granularity for a
single thread.
- constexpr size_t kMinBatchesPerThread = 32;
- size_t numThreads = std::min(maxThreads, (handleCount + kMinBatchesPerThread
- 1) / kMinBatchesPerThread);
- numThreads = std::max((size_t)1, numThreads);
-
- if (numThreads <= 1) {
+ if (numThreads == 1) {
auto builder = nativeHashTableBuild(
hashJoinKeys,
names,
@@ -1020,16 +1012,20 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
return gluten::getHashTableObjStore()->save(builder);
}
- std::vector<std::thread> threads;
-
+ // Use thread pool (executor) instead of creating threads directly
+ auto executor = VeloxBackend::get()->executor();
+
std::vector<std::shared_ptr<gluten::HashTableBuilder>>
hashTableBuilders(numThreads);
std::vector<std::unique_ptr<facebook::velox::exec::BaseHashTable>>
otherTables(numThreads);
+ std::vector<folly::Future<folly::Unit>> futures;
+ futures.reserve(numThreads);
for (size_t t = 0; t < numThreads; ++t) {
size_t start = (handleCount * t) / numThreads;
size_t end = (handleCount * (t + 1)) / numThreads;
- threads.emplace_back([&, t, start, end]() {
+ // Submit task to thread pool
+ auto future = folly::via(executor, [&, t, start, end]() {
std::vector<std::shared_ptr<gluten::ColumnarBatch>> threadBatches;
for (size_t i = start; i < end; ++i) {
threadBatches.push_back(cb[i]);
@@ -1050,11 +1046,12 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_native
hashTableBuilders[t] = std::move(builder);
otherTables[t] = std::move(hashTableBuilders[t]->uniqueTable());
});
+
+ futures.push_back(std::move(future));
}
- for (auto& thread : threads) {
- thread.join();
- }
+ // Wait for all tasks to complete
+ folly::collectAll(futures).wait();
auto mainTable = std::move(otherTables[0]);
std::vector<std::unique_ptr<facebook::velox::exec::BaseHashTable>> tables;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 73f2b57b9d..ce0a79f0bc 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -435,7 +435,8 @@ trait SparkPlanExecApi {
mode: BroadcastMode,
child: SparkPlan,
numOutputRows: SQLMetric,
- dataSize: SQLMetric): BuildSideRelation
+ dataSize: SQLMetric,
+ buildThreads: SQLMetric = null): BuildSideRelation
def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
mode.canonicalized
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 26a4ef3919..4b2890d51d 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -75,7 +75,8 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode,
child: SparkPlan)
mode,
child,
longMetric("numOutputRows"),
- longMetric("dataSize"))
+ longMetric("dataSize"),
+ metrics.getOrElse("buildThreads", null))
}
val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime"))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]