Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3410635311
##########
cpp/velox/jni/VeloxJniWrapper.cc:
##########
@@ -1162,6 +1171,115 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHa
ObjectStore::release(tableHandler);
JNI_METHOD_END()
}
+
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_serializeHashTable( // NOLINT
+ JNIEnv* env,
+ jclass,
+ jlong hashTableHandle) {
+ JNI_METHOD_START
+ auto builder =
ObjectStore::retrieve<gluten::HashTableBuilder>(hashTableHandle);
+ auto serialized = gluten::serializeHashTable(builder);
+ return gluten::getHashTableObjStore()->save(serialized);
+ JNI_METHOD_END(kInvalidObjectHandle)
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_gluten_vectorized_HashJoinBuilder_deserializeHashTableWithIgnoreNullKeys(
// NOLINT
+ JNIEnv* env,
+ jclass,
+ jbyteArray serializedData,
+ jboolean ignoreNullKeys,
+ jboolean joinHasNullKeys) {
+ JNI_METHOD_START
+
+ jsize dataSize = env->GetArrayLength(serializedData);
+ jbyte* dataPtr = env->GetByteArrayElements(serializedData, nullptr);
+
+ if (dataPtr == nullptr) {
+ throw gluten::GlutenException("Failed to get serialized data");
+ }
+
+ auto builder = gluten::deserializeHashTable(
+ reinterpret_cast<const uint8_t*>(dataPtr),
+ static_cast<size_t>(dataSize),
+ nullptr,
+ static_cast<bool>(ignoreNullKeys),
+ static_cast<bool>(joinHasNullKeys));
+
+ env->ReleaseByteArrayElements(serializedData, dataPtr, JNI_ABORT);
+
+ return gluten::getHashTableObjStore()->save(builder);
Review Comment:
`GetByteArrayElements` needs to be paired with a release even if
`deserializeHashTable(...)` throws; otherwise the JVM byte[] can remain pinned
and leak until GC. This file already uses `getByteArrayElementsSafe` elsewhere
and can reuse it here for RAII-style release.
##########
cpp/velox/operators/hashjoin/HashTableSerializer.cc:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/hashjoin/HashTableSerializer.h"
+#include <cstring>
+#include <sstream>
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten {
+
+template <bool ignoreNullKeys>
+HashTableSerializer::SerializedHashTable HashTableSerializer::serialize(
+ const facebook::velox::exec::HashTable<ignoreNullKeys>* hashTable) {
+ VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+ std::ostringstream oss(std::ios::binary);
+
+ hashTable->serialize(oss);
Review Comment:
`std::ostringstream` is constructed with `std::ios::binary` only, which
omits `std::ios::out`. This can leave the stream not opened for output and
produce an empty serialization buffer.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+case class SerializedBroadcastHashTable(
+ serializedData: Array[Byte],
+ numRows: Long,
+ ignoreNullKeys: Boolean,
+ joinHasNullKeys: Boolean,
+ bloomFilterBlocksByteSize: Long,
+ hashProbeDynamicFiltersProduced: Long,
+ buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ out.writeInt(serializedData.length)
+ out.write(serializedData)
+ out.writeObject(buildSideRelation)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ val numRows = in.readLong()
+ val ignoreNullKeys = in.readBoolean()
+ val joinHasNullKeys = in.readBoolean()
+ val bloomFilterBlocksByteSize = in.readLong()
+ val hashProbeDynamicFiltersProduced = in.readLong()
+ val dataLength = in.readInt()
+ val data = new Array[Byte](dataLength)
+ in.readFully(data)
+ val relation = in.readObject().asInstanceOf[BuildSideRelation]
+
+ // Use reflection to set final fields
+ val numRowsField =
classOf[SerializedBroadcastHashTable].getDeclaredField("numRows")
+ numRowsField.setAccessible(true)
+ numRowsField.set(this, numRows)
+
+ val dataField =
classOf[SerializedBroadcastHashTable].getDeclaredField("serializedData")
+ dataField.setAccessible(true)
+ dataField.set(this, data)
+
+ val relationField =
classOf[SerializedBroadcastHashTable].getDeclaredField("buildSideRelation")
+ relationField.setAccessible(true)
+ relationField.set(this, relation)
+
+ val ignoreNullKeysField =
+ classOf[SerializedBroadcastHashTable].getDeclaredField("ignoreNullKeys")
+ ignoreNullKeysField.setAccessible(true)
+ ignoreNullKeysField.set(this, ignoreNullKeys)
+
+ val joinHasNullKeysField =
+ classOf[SerializedBroadcastHashTable].getDeclaredField("joinHasNullKeys")
+ joinHasNullKeysField.setAccessible(true)
+ joinHasNullKeysField.set(this, joinHasNullKeys)
+
+ val bloomFilterBlocksByteSizeField =
+
classOf[SerializedBroadcastHashTable].getDeclaredField("bloomFilterBlocksByteSize")
+ bloomFilterBlocksByteSizeField.setAccessible(true)
+ bloomFilterBlocksByteSizeField.set(this, bloomFilterBlocksByteSize)
+
+ val hashProbeDynamicFiltersProducedField =
+
classOf[SerializedBroadcastHashTable].getDeclaredField("hashProbeDynamicFiltersProduced")
+ hashProbeDynamicFiltersProducedField.setAccessible(true)
+ hashProbeDynamicFiltersProducedField.set(this,
hashProbeDynamicFiltersProduced)
+ }
+
+ /**
+ * Deserialize the hash table on executor side. The serialized Velox hash
table is already in a
+ * prepared, probe-ready form, so executor side only needs deserialization
without re-running
+ * prepareJoinTable.
+ *
+ * @return
+ * Hash table builder handle
+ */
+ def deserialize(): Long = {
+ HashJoinBuilder.deserializeHashTableWithIgnoreNullKeys(
+ serializedData,
+ ignoreNullKeys,
+ joinHasNullKeys)
+ }
+
+ /** Get the size of serialized data in bytes. */
+ def sizeInBytes: Long = serializedData.length.toLong
+}
+
+object SerializedBroadcastHashTable {
+
+ /**
+ * Build and serialize a hash table on the driver.
+ *
+ * @param hashTableHandle
+ * Handle to the built hash table
+ * @param buildSideRelation
+ * The build side relation for metadata
+ * @return
+ * Serialized broadcast hash table
+ */
+ def fromHashTable(
+ hashTableHandle: Long,
+ buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable = {
+
+ // Serialize the hash table
+ val serializedHandle = HashJoinBuilder.serializeHashTable(hashTableHandle)
+
+ try {
+ // Get serialized data
+ val serializedData = HashJoinBuilder
+ .getSerializedData(serializedHandle)
+ val numRows = HashJoinBuilder
+ .getSerializedSize(serializedHandle)
+ val ignoreNullKeys = HashJoinBuilder
+ .getSerializedIgnoreNullKeys(serializedHandle)
+ val joinHasNullKeys = HashJoinBuilder
+ .getSerializedJoinHasNullKeys(serializedHandle)
Review Comment:
`numRows` is populated using `HashJoinBuilder.getSerializedSize`, but the
native method returns the serialized byte size (see JNI implementation). This
misreports bytes as row count and makes logs/metrics misleading.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala:
##########
@@ -238,6 +238,91 @@ case class ColumnarBuildSideRelation(
}
}
+ /**
+ * Build hash table with provided runtime (for driver-side build). This
version doesn't rely on
+ * TaskContext and can be called from the driver.
+ */
+ def buildHashTableWithRuntime(
+ broadcastContext: BroadcastHashJoinContext,
+ runtime: org.apache.gluten.runtime.Runtime): (Long,
ColumnarBuildSideRelation) =
+ synchronized {
+ if (hashTableData == 0) {
+ val startTime = System.nanoTime()
+ val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
+ val serializeHandle: Long = {
+ val allocator = ArrowBufferAllocators.globalInstance()
+ val cSchema = ArrowSchema.allocateNew(allocator)
+ val arrowSchema = SparkArrowUtil.toArrowSchema(
+ SparkShimLoader.getSparkShims.structFromAttributes(output),
+ SQLConf.get.sessionLocalTimeZone)
+ ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
+ val handle = jniWrapper
+ .init(cSchema.memoryAddress())
+ cSchema.close()
+ handle
+ }
+
+ val batchArray = new ArrayBuffer[Long]
+
+ var batchId = 0
+ while (batchId < batches.size) {
+ batchArray.append(jniWrapper.deserialize(serializeHandle,
batches(batchId)))
+ batchId += 1
+ }
+
+ logDebug(
+ s"BHJ value size: " +
+ s"${broadcastContext.buildHashTableId} = ${batches.length}")
+
+ val (keys, newOutput) = if (newBuildKeys.isEmpty) {
+ (
+ broadcastContext.buildSideJoinKeys.asJava,
+ broadcastContext.buildSideStructure.asJava
+ )
+ } else {
+ (
+ newBuildKeys.asJava,
+ output.asJava
+ )
+ }
+
+ val joinKeys = keys.asScala.map {
+ key =>
+ val attr = ConverterUtils.getAttrFromExpr(key)
+ ConverterUtils.genColumnNameWithExprId(attr)
+ }.toArray
+
+ val hashJoinBuilder = HashJoinBuilder.create(runtime)
+
+ // Build the hash table
+ hashTableData = hashJoinBuilder
+ .nativeBuild(
+ broadcastContext.buildHashTableId,
+ batchArray.toArray,
+ joinKeys,
+ broadcastContext.filterBuildColumns,
+ broadcastContext.filterPropagatesNulls,
+ broadcastContext.substraitJoinType.ordinal(),
+ broadcastContext.hasMixedFiltCondition,
+ broadcastContext.isExistenceJoin,
+ SubstraitUtil.toNameStruct(newOutput).toByteArray,
+ broadcastContext.isNullAwareAntiJoin,
+ broadcastContext.bloomFilterPushdownSize,
+ buildThreads
+ )
+
+ jniWrapper.close(serializeHandle)
+
Review Comment:
`serializeHandle` is closed only on the success path. If `nativeBuild(...)`
throws, the serializer handle (and potentially native resources) will leak.
Wrap the build in a try/finally to ensure `jniWrapper.close(serializeHandle)`
always runs.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -75,6 +86,108 @@ object VeloxBroadcastBuildSideCache
)
}
+ /**
+ * Build hash table on driver and serialize for broadcasting. This version
is called from
+ * BroadcastExchangeExec and doesn't need a broadcast variable.
+ *
+ * This is the Spark-native approach where hash table is built in
BroadcastExchangeExec.
+ */
+ def buildAndSerializeOnDriverInBroadcastExchange(
+ relation: BuildSideRelation,
+ broadcastContext: BroadcastHashJoinContext):
SerializedBroadcastHashTable = {
+
+ val broadcastId = broadcastContext.buildHashTableId
+
+ val cached = driverSerializedCache.getIfPresent(broadcastId)
+ if (cached != null) {
+ logInfo(s"Reusing cached serialized hash table for broadcast ID:
$broadcastId")
+ return cached
+ }
+
+ def resetRelation(): Unit = relation match {
+ case r: ColumnarBuildSideRelation => r.reset()
+ case r: UnsafeColumnarBuildSideRelation => r.reset()
+ case _ =>
+ }
+
+ relation.synchronized {
+ val cachedAfterLock = driverSerializedCache.getIfPresent(broadcastId)
+ if (cachedAfterLock != null) {
+ logInfo(s"Reusing cached serialized hash table for broadcast ID:
$broadcastId (after lock)")
+ return cachedAfterLock
+ }
+
+ logInfo(
+ s"Building hash table on driver in BroadcastExchangeExec " +
+ s"for broadcast ID: $broadcastId")
+
+ val backendName = BackendsApiManager.getBackendName
+
+ val runtime = org.apache.gluten.runtime.Runtime.createStandalone(
+ backendName,
+ "DriverBroadcastHashTableBuild"
+ )
+
+ try {
+ resetRelation()
+ val startBuildTime = System.currentTimeMillis()
+ val (hashTableHandle, _) = relation match {
+ case r: ColumnarBuildSideRelation =>
+ r.buildHashTableWithRuntime(broadcastContext, runtime)
+ case r: UnsafeColumnarBuildSideRelation =>
+ r.buildHashTableWithRuntime(broadcastContext, runtime)
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unsupported relation type for driver-side build:
${other.getClass.getName}")
+ }
+ try {
+ val buildTimeMs = System.currentTimeMillis() - startBuildTime
+ broadcastContext.buildHashTableTimeMetric.foreach(_ += buildTimeMs)
+ val startSerializeTime = System.currentTimeMillis()
Review Comment:
`buildHashTableTimeMetric` is updated inside
`buildHashTableWithRuntime(...)` already; updating it again here double-counts
driver build time.
##########
cpp/velox/operators/hashjoin/HashTableSerializer.cc:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/hashjoin/HashTableSerializer.h"
+#include <cstring>
+#include <sstream>
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten {
+
+template <bool ignoreNullKeys>
+HashTableSerializer::SerializedHashTable HashTableSerializer::serialize(
+ const facebook::velox::exec::HashTable<ignoreNullKeys>* hashTable) {
+ VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+ std::ostringstream oss(std::ios::binary);
+
+ hashTable->serialize(oss);
+
+ SerializedHashTable result;
+ std::string str = oss.str();
+ result.size = str.size();
+ result.data = std::make_unique<uint8_t[]>(result.size);
+ std::memcpy(result.data.get(), str.data(), result.size);
+
+ return result;
+}
+
+template <bool ignoreNullKeys>
+std::unique_ptr<facebook::velox::exec::HashTable<ignoreNullKeys>>
+HashTableSerializer::deserialize(const uint8_t* data, size_t size,
facebook::velox::memory::MemoryPool* pool) {
+ VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+ VELOX_CHECK_GT(size, 0, "Invalid serialized data size");
+ VELOX_CHECK_NOT_NULL(pool, "Memory pool cannot be null");
+
+ std::string str(reinterpret_cast<const char*>(data), size);
+ std::istringstream iss(str, std::ios::binary);
+
+ return facebook::velox::exec::HashTable<ignoreNullKeys>::deserialize(iss,
pool);
Review Comment:
`std::istringstream` is constructed with `std::ios::binary` only, which
omits `std::ios::in`. Use `binary|in` so the stream is opened for reading.
##########
backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala:
##########
@@ -129,8 +130,16 @@ class HashJoinMetricsUpdater(override val metrics:
Map[String, SQLMetric])
hashProbeSpilledPartitions += hashProbeMetrics.spilledPartitions
hashProbeSpilledFiles += hashProbeMetrics.spilledFiles
hashProbeReplacedWithDynamicFilterRows +=
hashProbeMetrics.numReplacedWithDynamicFilterRows
- hashProbeDynamicFiltersProduced +=
hashProbeMetrics.numDynamicFiltersProduced
- bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
+
+ // Only accumulate dynamic filter metrics when driver-side build is
disabled.
+ // When driver-side build is enabled, these metrics are set directly from
the
+ // serialized hash table in HashJoinExecTransformer to avoid double
counting.
+ val isDriverSideBuildEnabled =
+ VeloxConfig.get.enableDriverSideBroadcastHashTableBuild
+ if (!isDriverSideBuildEnabled) {
+ hashProbeDynamicFiltersProduced +=
hashProbeMetrics.numDynamicFiltersProduced
+ bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
+ }
Review Comment:
Dynamic filter metrics are skipped based solely on the global config flag.
If driver-side build is enabled but this particular join falls back to
executor-side build (e.g. offload=false, non-hash broadcast mode, missing join
context), the metrics will incorrectly remain 0.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +911,161 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
offload,
buildThreadsValue)
}
+
+ // Check if we should build hash table on driver (Spark-native approach)
+ // Only do this for HashedRelationBroadcastMode and when offload is enabled
+ val shouldBuildOnDriver =
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+ mode.isInstanceOf[HashedRelationBroadcastMode] &&
+ offload
+
+ if (shouldBuildOnDriver) {
+ // Try to get broadcast join context from logical plan tag
+ // In multi-join scenarios, there may be multiple contexts. Find the one
that matches
+ // the current broadcast child's output.
+ val joinContextOpt:
Option[org.apache.gluten.extension.BroadcastJoinContextInfo] =
+ findLogicalLink(child).flatMap {
+ logicalPlan =>
+ logicalPlan.getTagValue(
+
org.apache.gluten.extension.BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+ ).flatMap {
+ contexts =>
+ val childOutputSet = AttributeSet(newOutput)
+ // Find the context whose build output matches the child's
output
+ contexts.find {
+ ctx =>
+ val buildOutputMatches =
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+ ctx.buildOutputSet.subsetOf(childOutputSet)
+ buildOutputMatches
+ }
+ }
+ }
+
+ joinContextOpt match {
+ case Some(joinContext) =>
+ // We have join context information - build hash table on driver
+ logInfo(
+ s"Building hash table on driver in BroadcastExchangeExec " +
+ s"with join context: $joinContext")
+
+ // Create a broadcast ID for this hash table
+ val broadcastId =
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+ // Convert Spark JoinType to Substrait JoinType
+ val substraitJoinType = joinContext.joinType match {
+ case _: org.apache.spark.sql.catalyst.plans.InnerLike =>
+ JoinRel.JoinType.JOIN_TYPE_INNER
+ case org.apache.spark.sql.catalyst.plans.FullOuter =>
+ JoinRel.JoinType.JOIN_TYPE_OUTER
+ case org.apache.spark.sql.catalyst.plans.LeftOuter |
+ org.apache.spark.sql.catalyst.plans.RightOuter =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT
+ case org.apache.spark.sql.catalyst.plans.LeftSemi |
+ org.apache.spark.sql.catalyst.plans.ExistenceJoin(_) =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
+ case org.apache.spark.sql.catalyst.plans.LeftAnti =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+ case _ =>
+ JoinRel.JoinType.UNRECOGNIZED
+ }
Review Comment:
Join type conversion for driver-side hash table build currently maps both
LeftOuter and RightOuter to `JOIN_TYPE_LEFT`. This diverges from the existing
`needSwitchChildren`-aware conversion in `HashJoinExecTransformer` and can pass
the wrong join type into native hash table build when the build side is
switched.
##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
set -exu
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable
Review Comment:
The build script now defaults to cloning Velox from a personal fork
(`github.com/JkSelf/velox.git`). This is risky for reproducibility and
supply-chain/security; project build scripts should default to an
official/upstream repo (or clearly vendor the required patch), and only use
forks via explicit overrides.
##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t
numDistinct) const {
void HashTableBuilder::abandonHashBuildDedup() {
abandonHashBuildDedup_ = true;
- uniqueTable_->setAllowDuplicates(true);
+ // uniqueTable_->setAllowDuplicates(true);
lookup_.reset();
}
Review Comment:
`abandonHashBuildDedup()` resets `lookup_` so the build path will start
inserting rows directly. Commenting out `setAllowDuplicates(true)` leaves the
table configured for de-dup, which can keep paying duplicate-check costs (and
may violate assumptions of the non-dedup insert path).
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala:
##########
@@ -208,6 +210,92 @@ class UnsafeColumnarBuildSideRelation(
}
}
+ /**
+ * Build hash table with provided runtime (for driver-side build). This
version doesn't rely on
+ * TaskContext and can be called from the driver.
+ */
+ def buildHashTableWithRuntime(
+ broadcastContext: BroadcastHashJoinContext,
+ runtime: org.apache.gluten.runtime.Runtime): (Long, BuildSideRelation) =
+ synchronized {
+ if (hashTableData == 0) {
+ val startTime = System.nanoTime()
+ val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
+ val serializeHandle: Long = {
+ val allocator = ArrowBufferAllocators.globalInstance()
+ val cSchema = ArrowSchema.allocateNew(allocator)
+ val arrowSchema = SparkArrowUtil.toArrowSchema(
+ SparkShimLoader.getSparkShims.structFromAttributes(output),
+ SQLConf.get.sessionLocalTimeZone)
+ ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
+ val handle = jniWrapper
+ .init(cSchema.memoryAddress())
+ cSchema.close()
+ handle
+ }
+
+ val batchArray = new ArrayBuffer[Long]
+
+ var batchId = 0
+ while (batchId < batches.size) {
+ val (offset, length) = (batches(batchId).address(),
batches(batchId).size())
+ batchArray.append(jniWrapper.deserializeDirect(serializeHandle,
offset, length.toInt))
+ batchId += 1
+ }
+
+ logDebug(
+ s"BHJ value size: " +
+ s"${broadcastContext.buildHashTableId} = ${batches.size}")
+
+ val (keys, newOutput) = if (newBuildKeys.isEmpty) {
+ (
+ broadcastContext.buildSideJoinKeys.asJava,
+ broadcastContext.buildSideStructure.asJava
+ )
+ } else {
+ (
+ newBuildKeys.asJava,
+ output.asJava
+ )
+ }
+
+ val joinKeys = keys.asScala.map {
+ key =>
+ val attr = ConverterUtils.getAttrFromExpr(key)
+ ConverterUtils.genColumnNameWithExprId(attr)
+ }.toArray
+
+ val hashJoinBuilder = HashJoinBuilder.create(runtime)
+
+ // Build the hash table
+ hashTableData = hashJoinBuilder
+ .nativeBuild(
+ broadcastContext.buildHashTableId,
+ batchArray.toArray,
+ joinKeys,
+ broadcastContext.filterBuildColumns,
+ broadcastContext.filterPropagatesNulls,
+ broadcastContext.substraitJoinType.ordinal(),
+ broadcastContext.hasMixedFiltCondition,
+ broadcastContext.isExistenceJoin,
+ SubstraitUtil.toNameStruct(newOutput).toByteArray,
+ broadcastContext.isNullAwareAntiJoin,
+ broadcastContext.bloomFilterPushdownSize,
+ buildThreads
+ )
+
+ jniWrapper.close(serializeHandle)
+
Review Comment:
`serializeHandle` is closed only on the success path. If `nativeBuild(...)`
throws, the serializer handle (and potentially native resources) will leak.
Wrap the build in a try/finally to ensure `jniWrapper.close(serializeHandle)`
always runs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]