Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3509980953


##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         offload,
         buildThreadsValue)
     }
+
+    // Check if we should build hash table on driver (Spark-native approach)
+    // Only do this for HashedRelationBroadcastMode and when offload is enabled
+    val shouldBuildOnDriver = 
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+      mode.isInstanceOf[HashedRelationBroadcastMode] &&
+      offload
+
+    if (shouldBuildOnDriver) {
+      // Try to get broadcast join context from logical plan tag
+      // In multi-join scenarios, there may be multiple contexts. Find the one 
that matches
+      // the current broadcast child's output.
+      val joinContextOpt: Option[BroadcastJoinContextInfo] =
+        findLogicalLink(child).flatMap {
+          logicalPlan =>
+            logicalPlan.getTagValue(
+              BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+            ).flatMap {
+              contexts =>
+                val childOutputSet = AttributeSet(newOutput)
+                // Find the context whose build output matches the child's 
output
+                contexts.find {
+                  ctx =>
+                    val buildOutputMatches = 
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+                      ctx.buildOutputSet.subsetOf(childOutputSet)
+                    buildOutputMatches
+                }

Review Comment:
   `contexts.find` selects the first matching `BroadcastJoinContextInfo` when 
multiple joins share the same broadcast build-side output. With exchange reuse, 
this can pick the wrong join context (different join keys / conditions), 
causing the driver-built hash table to be constructed with incorrect keys and 
producing wrong results. Prefer disabling driver-side build when the context 
match is ambiguous (0 or >1 matches), and only proceed when exactly one context 
matches the build-side output.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+    var serializedData: UnsafeByteArray,
+    var numRows: Long,
+    var ignoreNullKeys: Boolean,
+    var joinHasNullKeys: Boolean,
+    var droppedDuplicates: Boolean,
+    var bloomFilterBlocksByteSize: Long,
+    var hashProbeDynamicFiltersProduced: Long,
+    var buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeBoolean(droppedDuplicates)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    numRows = in.readLong()
+    ignoreNullKeys = in.readBoolean()
+    joinHasNullKeys = in.readBoolean()
+    droppedDuplicates = in.readBoolean()
+    bloomFilterBlocksByteSize = in.readLong()
+    hashProbeDynamicFiltersProduced = in.readLong()
+    val data = new UnsafeByteArray()
+    data.readExternal(in)
+    serializedData = data
+    buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation]
+  }
+
+  /**
+   * Deserialize the hash table on executor side. The serialized Velox hash 
table is already in a
+   * prepared, probe-ready form, so executor side only needs deserialization 
without re-running
+   * prepareJoinTable.
+   *
+   * @return
+   *   Hash table builder handle
+   */
+  def deserialize(): Long = {
+    HashJoinBuilder.deserializeHashTableDirect(
+      serializedData.address(),
+      Math.toIntExact(serializedData.size()),
+      ignoreNullKeys,
+      joinHasNullKeys)
+  }
+
+  /** Get the size of serialized data in bytes. */
+  def sizeInBytes: Long = serializedData.size()
+}
+
+object SerializedBroadcastHashTable {
+  def apply(
+      serializedData: UnsafeByteArray,
+      numRows: Long,
+      ignoreNullKeys: Boolean,
+      joinHasNullKeys: Boolean,
+      droppedDuplicates: Boolean,
+      bloomFilterBlocksByteSize: Long,
+      hashProbeDynamicFiltersProduced: Long,
+      buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable =
+    new SerializedBroadcastHashTable(
+      serializedData,
+      numRows,
+      ignoreNullKeys,
+      joinHasNullKeys,
+      droppedDuplicates,
+      bloomFilterBlocksByteSize,
+      hashProbeDynamicFiltersProduced,
+      buildSideRelation)
+
+  /**
+   * Build and serialize a hash table on the driver.
+   *
+   * @param hashTableHandle
+   *   Handle to the built hash table
+   * @param buildSideRelation
+   *   The build side relation for metadata
+   * @return
+   *   Serialized broadcast hash table
+   */
+  def fromHashTable(
+      hashTableHandle: Long,
+      buildSideRelation: BuildSideRelation,
+      droppedDuplicates: Boolean,
+      numRows: Long): SerializedBroadcastHashTable = {
+    try {
+      val serializedSize = 
HashJoinBuilder.serializedHashTableSizeDirect(hashTableHandle)
+      val byteBuffer = JniUnsafeByteBuffer.allocate(serializedSize)
+      HashJoinBuilder.serializeHashTableDirect(
+        hashTableHandle,
+        byteBuffer.address(),
+        byteBuffer.size())

Review Comment:
   `SerializedBroadcastHashTable` ultimately deserializes using an `int` size 
(see `deserialize()` calling `Math.toIntExact(serializedData.size())`), but 
`fromHashTable` allows allocating/serializing arbitrarily large buffers via 
`JniUnsafeByteBuffer.allocate(serializedSize)`. Add an explicit upper bound 
check to fail fast with a clear error when the serialized hash table exceeds 
`Int.MaxValue` bytes (otherwise serialization/deserialization can overflow or 
throw at runtime).



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -58,6 +66,23 @@ object VeloxBroadcastBuildSideCache
       .removalListener(this)
       .build[String, BroadcastHashTable]()
 
+  // Cache for driver-side serialized hash tables to avoid rebuilding for 
reuse exchange
+  private val driverSerializedCache: Cache[String, 
SerializedBroadcastHashTable] =
+    Caffeine.newBuilder
+      .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+      .removalListener(
+        new RemovalListener[String, SerializedBroadcastHashTable] {
+          override def onRemoval(
+              key: String,
+              value: SerializedBroadcastHashTable,
+              cause: RemovalCause): Unit = {
+            if (value != null && value.serializedData != null) {
+              value.serializedData.release()
+            }
+          }
+        }

Review Comment:
   `driverSerializedCache` evicts entries by calling 
`value.serializedData.release()`, but the cached `SerializedBroadcastHashTable` 
instance is also returned and embedded in the Spark broadcast variable. 
Evicting the cache while the broadcast is still live will close the underlying 
ArrowBuf and can trigger use-after-free / crashes when executors fetch or 
deserialize the broadcast.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+    var serializedData: UnsafeByteArray,
+    var numRows: Long,
+    var ignoreNullKeys: Boolean,
+    var joinHasNullKeys: Boolean,
+    var droppedDuplicates: Boolean,
+    var bloomFilterBlocksByteSize: Long,
+    var hashProbeDynamicFiltersProduced: Long,
+    var buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeBoolean(droppedDuplicates)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }

Review Comment:
   `SerializedBroadcastHashTable.writeExternal` serializes both the serialized 
hash table bytes and the full `buildSideRelation`. For 
`ColumnarBuildSideRelation`/`UnsafeColumnarBuildSideRelation`, that relation 
itself contains the full broadcasted build-side data, so this increases 
broadcast payload size (raw data + serialized hash table) and can negate some 
of the intended benefit / risk hitting broadcast size limits. Consider 
replacing `buildSideRelation` with a lightweight metadata-only relation, or 
persisting only what’s needed for `transform()`/DPP rather than the full 
build-side batches.



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,8 +17,8 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_30
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_30_hashtable_ser
 VELOX_ENHANCED_BRANCH=ibm-2026_06_30

Review Comment:
   The default `VELOX_REPO`/`VELOX_BRANCH` is changed to a personal 
fork/branch. This makes builds non-reproducible for other contributors and CI, 
and is risky for an Apache project. Prefer keeping an upstream/default repo 
(e.g., IBM/velox) and use `--velox_repo/--velox_branch` (or 
`UPSTREAM_VELOX_PR_ID`) when testing a fork/PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to