Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3424636587
##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t
numDistinct) const {
void HashTableBuilder::abandonHashBuildDedup() {
abandonHashBuildDedup_ = true;
- uniqueTable_->setAllowDuplicates(true);
+ // uniqueTable_->setAllowDuplicates(true);
Review Comment:
`abandonHashBuildDedup()` marks dedup as abandoned, but commenting out
`uniqueTable_->setAllowDuplicates(true)` leaves the hash table in a
no-duplicates mode. That can silently drop duplicate build rows for the same
key, producing wrong join results once dedup is abandoned.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
offload,
buildThreadsValue)
}
+
+ // Check if we should build hash table on driver (Spark-native approach)
+ // Only do this for HashedRelationBroadcastMode and when offload is enabled
+ val shouldBuildOnDriver =
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+ mode.isInstanceOf[HashedRelationBroadcastMode] &&
+ offload
+
+ if (shouldBuildOnDriver) {
+ // Try to get broadcast join context from logical plan tag
+ // In multi-join scenarios, there may be multiple contexts. Find the one
that matches
+ // the current broadcast child's output.
+ val joinContextOpt: Option[BroadcastJoinContextInfo] =
+ findLogicalLink(child).flatMap {
+ logicalPlan =>
+ logicalPlan.getTagValue(
+ BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+ ).flatMap {
+ contexts =>
+ val childOutputSet = AttributeSet(newOutput)
+ // Find the context whose build output matches the child's
output
+ contexts.find {
+ ctx =>
+ val buildOutputMatches =
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+ ctx.buildOutputSet.subsetOf(childOutputSet)
+ buildOutputMatches
+ }
+ }
+ }
+
+ joinContextOpt match {
+ case Some(joinContext) =>
+ // We have join context information - build hash table on driver
+ logInfo(
+ s"Building hash table on driver in BroadcastExchangeExec " +
+ s"with join context: $joinContext")
+
+ // Create a broadcast ID for this hash table
+ val broadcastId =
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+ // Convert Spark JoinType to Substrait JoinType
+ val substraitJoinType = joinContext.joinType match {
+ case _: InnerLike =>
+ JoinRel.JoinType.JOIN_TYPE_INNER
+ case FullOuter =>
+ JoinRel.JoinType.JOIN_TYPE_OUTER
+ case LeftOuter |
+ RightOuter =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT
+ case LeftSemi |
Review Comment:
`RightOuter` is currently mapped to `JOIN_TYPE_LEFT`, but the native build
path converts `JOIN_TYPE_RIGHT` to `velox::core::JoinType::kRight` (see
JniHashTable.cc). Using `JOIN_TYPE_LEFT` for right outer joins can build a hash
table with the wrong join semantics and lead to incorrect results when
driver-side build is enabled.
##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
return JniHashTableContext::getInstance().callJavaGet(hashTableId);
}
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+ VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+ auto hashTable = builder->hashTable();
+ VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+ auto* hashTableFalse =
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+ if (hashTableFalse != nullptr) {
+ return HashTableSerializer::serializedSize<false>(hashTableFalse);
+ }
+
+ auto* hashTableTrue =
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+ VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either
HashTable<false> or HashTable<true>");
+ return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t*
data, size_t size) {
+ VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+ VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+ auto hashTable = builder->hashTable();
+ VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+ auto* hashTableFalse =
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+ if (hashTableFalse != nullptr) {
+ HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+ return;
+ }
+
+ auto* hashTableTrue =
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+ VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either
HashTable<false> or HashTable<true>");
+ HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+ const uint8_t* data,
+ size_t size,
+ facebook::velox::memory::MemoryPool* memoryPool,
+ bool ignoreNullKeys,
+ bool joinHasNullKeys) {
+ VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+ VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+ auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") :
defaultLeafVeloxMemoryPool();
Review Comment:
`HashTableBuilder` stores the provided memory pool as a raw pointer
(`pool_`). When `memoryPool` is non-null, `addLeafChild()` returns a new pool
owned only by the returned `shared_ptr`; if that `shared_ptr` doesn't outlive
the builder, this becomes a potential use-after-free. Prefer using the provided
`memoryPool` directly (caller-owned), or ensure the child pool's lifetime is
tied to the returned builder.
##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
set -exu
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable
Review Comment:
This switches the default Velox repo/branch to a personal fork
(`JkSelf/velox`). That makes builds non-reproducible and risks breakage if the
fork/branch is deleted or rebased. Defaults should point to an
official/upstream repo/branch; contributors can still override via
`--velox_repo/--velox_branch` when testing.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+ var serializedData: UnsafeByteArray,
+ var numRows: Long,
+ var ignoreNullKeys: Boolean,
+ var joinHasNullKeys: Boolean,
+ var bloomFilterBlocksByteSize: Long,
+ var hashProbeDynamicFiltersProduced: Long,
+ var buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ serializedData.writeExternal(out)
+ out.writeObject(buildSideRelation)
+ }
Review Comment:
`SerializedBroadcastHashTable` serializes and broadcasts `buildSideRelation`
along with the serialized hash table bytes. For `ColumnarBuildSideRelation` /
`UnsafeColumnarBuildSideRelation`, that likely includes the original
broadcasted batch data, which can substantially inflate broadcast size and
memory (raw data + serialized hash table). Consider minimizing what gets
serialized here (e.g., store only what DPP needs, or gate serializing the
build-side payload behind a config).
##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -55,6 +60,23 @@ object VeloxBroadcastBuildSideCache
.removalListener(this)
.build[String, BroadcastHashTable]()
+ // Cache for driver-side serialized hash tables to avoid rebuilding for
reuse exchange
+ private val driverSerializedCache: Cache[String,
SerializedBroadcastHashTable] =
+ Caffeine.newBuilder
+ .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+ .removalListener(
+ new RemovalListener[String, SerializedBroadcastHashTable] {
+ override def onRemoval(
+ key: String,
+ value: SerializedBroadcastHashTable,
+ cause: RemovalCause): Unit = {
+ if (value != null && value.serializedData != null) {
+ value.serializedData.release()
+ }
Review Comment:
Releasing `value.serializedData` from the Caffeine eviction listener is
unsafe because the cached `SerializedBroadcastHashTable` instance is also
returned and embedded in the broadcast value. If the cache entry expires/evicts
while the broadcast is still in use (or in local mode where tasks may share the
same JVM object), this will null out the buffer and break
deserialization/probing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]