Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3418003364


##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -75,6 +86,108 @@ object VeloxBroadcastBuildSideCache
       )
   }
 
+  /**
+   * Build hash table on driver and serialize for broadcasting. This version 
is called from
+   * BroadcastExchangeExec and doesn't need a broadcast variable.
+   *
+   * This is the Spark-native approach where hash table is built in 
BroadcastExchangeExec.
+   */
+  def buildAndSerializeOnDriverInBroadcastExchange(
+      relation: BuildSideRelation,
+      broadcastContext: BroadcastHashJoinContext): 
SerializedBroadcastHashTable = {
+
+    val broadcastId = broadcastContext.buildHashTableId
+
+    val cached = driverSerializedCache.getIfPresent(broadcastId)
+    if (cached != null) {
+      logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId")
+      return cached
+    }
+
+    def resetRelation(): Unit = relation match {
+      case r: ColumnarBuildSideRelation => r.reset()
+      case r: UnsafeColumnarBuildSideRelation => r.reset()
+      case _ =>
+    }
+
+    relation.synchronized {
+      val cachedAfterLock = driverSerializedCache.getIfPresent(broadcastId)
+      if (cachedAfterLock != null) {
+        logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId (after lock)")
+        return cachedAfterLock
+      }
+
+      logInfo(
+        s"Building hash table on driver in BroadcastExchangeExec " +
+          s"for broadcast ID: $broadcastId")
+
+      val backendName = BackendsApiManager.getBackendName
+
+      val runtime = org.apache.gluten.runtime.Runtime.createStandalone(
+        backendName,
+        "DriverBroadcastHashTableBuild"
+      )
+
+      try {
+        resetRelation()
+        val startBuildTime = System.currentTimeMillis()
+        val (hashTableHandle, _) = relation match {
+          case r: ColumnarBuildSideRelation =>
+            r.buildHashTableWithRuntime(broadcastContext, runtime)
+          case r: UnsafeColumnarBuildSideRelation =>
+            r.buildHashTableWithRuntime(broadcastContext, runtime)
+          case other =>
+            throw new IllegalArgumentException(
+              s"Unsupported relation type for driver-side build: 
${other.getClass.getName}")
+        }
+        try {
+          val buildTimeMs = System.currentTimeMillis() - startBuildTime
+          broadcastContext.buildHashTableTimeMetric.foreach(_ += buildTimeMs)
+          val startSerializeTime = System.currentTimeMillis()

Review Comment:
   `buildHashTableTimeMetric` is being updated twice for driver-side builds: 
once inside `buildHashTableWithRuntime` (it already does 
`broadcastContext.buildHashTableTimeMetric.foreach(...)`), and again here. This 
will double-count driver build time in `driverBuildHashTableTime`.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -55,6 +60,12 @@ object VeloxBroadcastBuildSideCache
       .removalListener(this)
       .build[String, BroadcastHashTable]()
 
+  // Cache for driver-side serialized hash tables to avoid rebuilding for 
reuse exchange
+  private val driverSerializedCache: Cache[String, 
SerializedBroadcastHashTable] =
+    Caffeine.newBuilder
+      .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+      .build[String, SerializedBroadcastHashTable]()

Review Comment:
   `driverSerializedCache` holds `SerializedBroadcastHashTable` instances whose 
`serializedData` is an off-heap `UnsafeByteArray` (ArrowBuf). Without a removal 
listener releasing the buffer, evicted/invalidated cache entries will leak 
off-heap memory until finalization/GC, which is unreliable under Spark 
workloads.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+case class SerializedBroadcastHashTable(
+    serializedData: UnsafeByteArray,
+    numRows: Long,
+    ignoreNullKeys: Boolean,
+    joinHasNullKeys: Boolean,
+    bloomFilterBlocksByteSize: Long,
+    hashProbeDynamicFiltersProduced: Long,
+    buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    val numRows = in.readLong()
+    val ignoreNullKeys = in.readBoolean()
+    val joinHasNullKeys = in.readBoolean()
+    val bloomFilterBlocksByteSize = in.readLong()
+    val hashProbeDynamicFiltersProduced = in.readLong()
+    val data = new UnsafeByteArray()
+    data.readExternal(in)
+    val relation = in.readObject().asInstanceOf[BuildSideRelation]
+
+    // Use reflection to set final fields
+    val numRowsField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("numRows")
+    numRowsField.setAccessible(true)
+    numRowsField.set(this, numRows)
+
+    val dataField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("serializedData")
+    dataField.setAccessible(true)
+    dataField.set(this, data)
+
+    val relationField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("buildSideRelation")
+    relationField.setAccessible(true)
+    relationField.set(this, relation)
+
+    val ignoreNullKeysField =
+      classOf[SerializedBroadcastHashTable].getDeclaredField("ignoreNullKeys")
+    ignoreNullKeysField.setAccessible(true)
+    ignoreNullKeysField.set(this, ignoreNullKeys)
+
+    val joinHasNullKeysField =
+      classOf[SerializedBroadcastHashTable].getDeclaredField("joinHasNullKeys")
+    joinHasNullKeysField.setAccessible(true)
+    joinHasNullKeysField.set(this, joinHasNullKeys)
+
+    val bloomFilterBlocksByteSizeField =
+      
classOf[SerializedBroadcastHashTable].getDeclaredField("bloomFilterBlocksByteSize")
+    bloomFilterBlocksByteSizeField.setAccessible(true)
+    bloomFilterBlocksByteSizeField.set(this, bloomFilterBlocksByteSize)
+
+    val hashProbeDynamicFiltersProducedField =
+      
classOf[SerializedBroadcastHashTable].getDeclaredField("hashProbeDynamicFiltersProduced")
+    hashProbeDynamicFiltersProducedField.setAccessible(true)
+    hashProbeDynamicFiltersProducedField.set(this, 
hashProbeDynamicFiltersProduced)
+  }
+
+  /**
+   * Deserialize the hash table on executor side. The serialized Velox hash 
table is already in a
+   * prepared, probe-ready form, so executor side only needs deserialization 
without re-running
+   * prepareJoinTable.
+   *
+   * @return
+   *   Hash table builder handle
+   */
+  def deserialize(): Long = {
+    HashJoinBuilder.deserializeHashTableDirect(
+      serializedData.address(),
+      Math.toIntExact(serializedData.size()),
+      ignoreNullKeys,
+      joinHasNullKeys)
+  }
+
+  /** Get the size of serialized data in bytes. */
+  def sizeInBytes: Long = serializedData.size()
+}
+
+object SerializedBroadcastHashTable {
+
+  /**
+   * Build and serialize a hash table on the driver.
+   *
+   * @param hashTableHandle
+   *   Handle to the built hash table
+   * @param buildSideRelation
+   *   The build side relation for metadata
+   * @return
+   *   Serialized broadcast hash table
+   */
+  def fromHashTable(
+      hashTableHandle: Long,
+      buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable = {
+    try {
+      val serializedData = HashJoinBuilder
+        .serializeHashTableDirect(hashTableHandle)
+        .toUnsafeByteArray()
+      val numRows = serializedData.size()
+      val ignoreNullKeys = HashJoinBuilder
+        .getHashTableIgnoreNullKeys(hashTableHandle)

Review Comment:
   `numRows` is populated with `serializedData.size()` (bytes), not the number 
of build-side rows. This makes logs/metrics misleading (e.g., 
`rows=${result.numRows}`), and any downstream logic expecting a row count will 
be wrong. Consider plumbing the actual row count from `BroadcastExchangeExec` 
(e.g., `numOutputRows`) into `SerializedBroadcastHashTable` (or rename the 
field to reflect bytes).



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable

Review Comment:
   Defaulting the Velox submodule source to a personal fork makes builds 
non-reproducible and introduces supply-chain risk. The script already supports 
`--velox_repo` / `--velox_branch` overrides for testing; the defaults should 
point to the project’s canonical Velox repo/branches.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala:
##########
@@ -93,7 +93,7 @@ class UnsafeColumnarBuildSideRelation(
     private var batches: Seq[UnsafeByteArray],
     private var safeBroadcastMode: SafeBroadcastMode,
     private var newBuildKeys: Seq[Expression],
-    private var offload: Boolean,
+    var offload: Boolean,

Review Comment:
   Exposing `offload` as a public `var` reduces encapsulation and allows 
external mutation of internal state. There is already an accessor (`def 
isOffload: Boolean = offload`), so callers can read it without making the field 
public.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +911,161 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         offload,
         buildThreadsValue)
     }
+
+    // Check if we should build hash table on driver (Spark-native approach)
+    // Only do this for HashedRelationBroadcastMode and when offload is enabled
+    val shouldBuildOnDriver = 
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+      mode.isInstanceOf[HashedRelationBroadcastMode] &&
+      offload
+
+    if (shouldBuildOnDriver) {

Review Comment:
   Driver-side hash table build is a significant behavior change (new broadcast 
relation type, new JNI serialization path, new caches). There are no tests here 
asserting correctness and resource cleanup for the driver-build=true path 
(including reuse exchange scenarios and off-heap release). Please add targeted 
tests that exercise `driverSideBroadcastHashTableBuild=true` and validate join 
results + cache cleanup.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+case class SerializedBroadcastHashTable(
+    serializedData: UnsafeByteArray,
+    numRows: Long,
+    ignoreNullKeys: Boolean,
+    joinHasNullKeys: Boolean,
+    bloomFilterBlocksByteSize: Long,
+    hashProbeDynamicFiltersProduced: Long,
+    buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    val numRows = in.readLong()
+    val ignoreNullKeys = in.readBoolean()
+    val joinHasNullKeys = in.readBoolean()
+    val bloomFilterBlocksByteSize = in.readLong()
+    val hashProbeDynamicFiltersProduced = in.readLong()
+    val data = new UnsafeByteArray()
+    data.readExternal(in)
+    val relation = in.readObject().asInstanceOf[BuildSideRelation]
+
+    // Use reflection to set final fields
+    val numRowsField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("numRows")
+    numRowsField.setAccessible(true)
+    numRowsField.set(this, numRows)
+

Review Comment:
   `SerializedBroadcastHashTable` is a case class but implements 
`Externalizable` by mutating its constructor vals via reflection in 
`readExternal`. This is brittle (depends on field names/Scala compilation 
details) and can break under JVM access restrictions. Prefer a regular class 
with `var` fields (or a custom `readResolve`/serializer) instead of reflective 
mutation.



##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t 
numDistinct) const {
 
 void HashTableBuilder::abandonHashBuildDedup() {
   abandonHashBuildDedup_ = true;
-  uniqueTable_->setAllowDuplicates(true);
+  // uniqueTable_->setAllowDuplicates(true);
   lookup_.reset();
 }

Review Comment:
   `abandonHashBuildDedup()` previously switched the hash table to allow 
duplicates after abandoning the de-duplication path. Commenting out 
`uniqueTable_->setAllowDuplicates(true)` changes behavior and can break join 
semantics or trigger unexpected failures when duplicates are encountered.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala:
##########
@@ -177,9 +178,69 @@ case class BroadcastHashJoinExecTransformer(
         buildBroadcastTableId,
         isNullAwareAntiJoin,
         bloomFilterPushdownSize,
-        metrics.get("buildHashTableTime")
+        metrics.get("buildHashTableTime"),
+        metrics.get("serializeHashTableTime"),
+        metrics.get("deserializeHashTableTime"),
+        metrics.get("serializedHashTableSize")
       )
-    val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast, 
context)
+
+    // Check the type of broadcast relation to determine the approach
+    val broadcastRDD = broadcast.value match {
+      case serializedRelation: SerializedHashTableBroadcastRelation =>
+        // Hash table was already built and serialized in 
BroadcastExchangeExec.
+        // Reuse the existing broadcast variable (no re-broadcast).
+        logInfo(
+          s"Using pre-built serialized hash table from BroadcastExchangeExec " 
+
+            s"for $buildBroadcastTableId")
+
+        val rdd = VeloxSerializedBroadcastRDD(sparkContext, broadcast, context)
+
+        // Update bloom filter metrics
+        val (bloomFilterSize, dynamicFiltersProduced) = 
rdd.getBloomFilterMetrics
+        
metrics.get("bloomFilterBlocksByteSize").foreach(_.set(bloomFilterSize))
+        
metrics.get("hashProbeDynamicFiltersProduced").foreach(_.set(dynamicFiltersProduced))
+
+        // Update size metric from the pre-built hash table
+        val (_, sizeInBytes, _, _) = serializedRelation.getMetrics
+        metrics.get("serializedHashTableSize").foreach(_.set(sizeInBytes))
+
+        rdd
+
+      case columnar: ColumnarBuildSideRelation =>
+        // Legacy path: ColumnarBuildSideRelation from BroadcastExchangeExec.
+        // Hash table is built on each executor from broadcast data.
+        val canOffload = columnar.offload
+
+        if (!canOffload) {
+          logWarning(
+            s"Build side cannot be offloaded for $buildBroadcastTableId, " +
+              "falling back to executor-side build")
+        } else {
+          logInfo(s"Using executor-side broadcast hash table build for 
$buildBroadcastTableId")
+        }
+        VeloxBroadcastBuildSideRDD(sparkContext, broadcast, context)
+
+      case unsafe: UnsafeColumnarBuildSideRelation =>
+        // Similar to ColumnarBuildSideRelation
+        val canOffload = unsafe.offload
+

Review Comment:
   This code relies on `unsafe.offload` being publicly accessible. Prefer using 
the existing accessor (`unsafe.isOffload`) so `offload` can remain encapsulated 
(and avoid making it a public mutable field).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to