Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3432489763
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala:
##########
@@ -209,6 +209,93 @@ case class ColumnarBuildSideRelation(
val hashJoinBuilder = HashJoinBuilder.create(runtime)
+ try {
+ // Build the hash table
+ hashTableData = hashJoinBuilder
+ .nativeBuild(
+ broadcastContext.buildHashTableId,
+ batchArray.toArray,
+ joinKeys,
+ broadcastContext.filterBuildColumns,
+ broadcastContext.filterPropagatesNulls,
+ broadcastContext.substraitJoinType.ordinal(),
+ broadcastContext.hasMixedFiltCondition,
+ broadcastContext.isExistenceJoin,
+ SubstraitUtil.toNameStruct(newOutput).toByteArray,
+ broadcastContext.isNullAwareAntiJoin,
+ broadcastContext.bloomFilterPushdownSize,
+ buildThreads
+ )
+ } finally {
+ jniWrapper.close(serializeHandle)
+ }
+
+ // Update build hash table time metric
+ val elapsedTime = System.nanoTime() - startTime
+ broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime /
1000000)
+
+ (hashTableData, this)
+ } else {
+ (HashJoinBuilder.cloneHashTable(hashTableData), null)
+ }
+ }
+
+ /**
+ * Build hash table with provided runtime (for driver-side build). This
version doesn't rely on
+ * TaskContext and can be called from the driver.
+ */
+ def buildHashTableWithRuntime(
+ broadcastContext: BroadcastHashJoinContext,
+ runtime: org.apache.gluten.runtime.Runtime): (Long,
ColumnarBuildSideRelation) =
+ synchronized {
+ if (hashTableData == 0) {
+ val startTime = System.nanoTime()
+ val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
+ val serializeHandle: Long = {
+ val allocator = ArrowBufferAllocators.globalInstance()
+ val cSchema = ArrowSchema.allocateNew(allocator)
+ val arrowSchema = SparkArrowUtil.toArrowSchema(
+ SparkShimLoader.getSparkShims.structFromAttributes(output),
+ SQLConf.get.sessionLocalTimeZone)
+ ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
+ val handle = jniWrapper
+ .init(cSchema.memoryAddress())
+ cSchema.close()
+ handle
+ }
+
+ val batchArray = new ArrayBuffer[Long]
+
+ var batchId = 0
+ while (batchId < batches.size) {
+ batchArray.append(jniWrapper.deserialize(serializeHandle,
batches(batchId)))
+ batchId += 1
+ }
+
+ logDebug(
+ s"BHJ value size: " +
+ s"${broadcastContext.buildHashTableId} = ${batches.length}")
+
+ val (keys, newOutput) = if (newBuildKeys.isEmpty) {
+ (
+ broadcastContext.buildSideJoinKeys.asJava,
+ broadcastContext.buildSideStructure.asJava
+ )
+ } else {
+ (
+ newBuildKeys.asJava,
+ output.asJava
+ )
+ }
+
+ val joinKeys = keys.asScala.map {
+ key =>
+ val attr = ConverterUtils.getAttrFromExpr(key)
+ ConverterUtils.genColumnNameWithExprId(attr)
+ }.toArray
+
+ val hashJoinBuilder = HashJoinBuilder.create(runtime)
+
// Build the hash table
hashTableData = hashJoinBuilder
.nativeBuild(
Review Comment:
`buildHashTableWithRuntime` needs to ensure `serializeHandle` is closed in a
`finally` block. Right now `nativeBuild(...)` can throw and skip the
`jniWrapper.close(serializeHandle)` later in the method, leaking the serializer
handle on the driver.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+ var serializedData: UnsafeByteArray,
+ var numRows: Long,
+ var ignoreNullKeys: Boolean,
+ var joinHasNullKeys: Boolean,
+ var bloomFilterBlocksByteSize: Long,
+ var hashProbeDynamicFiltersProduced: Long,
+ var buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ serializedData.writeExternal(out)
+ out.writeObject(buildSideRelation)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ numRows = in.readLong()
+ ignoreNullKeys = in.readBoolean()
+ joinHasNullKeys = in.readBoolean()
+ bloomFilterBlocksByteSize = in.readLong()
+ hashProbeDynamicFiltersProduced = in.readLong()
+ val data = new UnsafeByteArray()
+ data.readExternal(in)
+ serializedData = data
+ buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation]
+ }
Review Comment:
`SerializedBroadcastHashTable.writeExternal` serializes `buildSideRelation`,
which (for the driver-build path) is typically the original
`ColumnarBuildSideRelation`/`UnsafeColumnarBuildSideRelation` and includes the
raw broadcast batches. That means executors receive *both* the serialized hash
table and the raw build-side data, largely defeating the intended
network/memory reduction of driver-side hash-table broadcast.
##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
set -exu
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable
Review Comment:
The default `VELOX_REPO` is switched to a personal fork
(`https://github.com/JkSelf/velox.git`). This is a supply-chain/operational
risk for the project (and can break reproducible builds if the fork/branch
disappears). Prefer an official upstream (e.g., IBM/velox) plus a pinned
commit/PR reference, or require contributors to override `--velox_repo`
explicitly for testing forks rather than changing the default.
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
offload,
buildThreadsValue)
}
+
+ // Check if we should build hash table on driver (Spark-native approach)
+ // Only do this for HashedRelationBroadcastMode and when offload is enabled
+ val shouldBuildOnDriver =
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+ mode.isInstanceOf[HashedRelationBroadcastMode] &&
+ offload
+
+ if (shouldBuildOnDriver) {
+ // Try to get broadcast join context from logical plan tag
+ // In multi-join scenarios, there may be multiple contexts. Find the one
that matches
+ // the current broadcast child's output.
+ val joinContextOpt: Option[BroadcastJoinContextInfo] =
+ findLogicalLink(child).flatMap {
+ logicalPlan =>
+ logicalPlan.getTagValue(
+ BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+ ).flatMap {
+ contexts =>
+ val childOutputSet = AttributeSet(newOutput)
+ // Find the context whose build output matches the child's
output
+ contexts.find {
+ ctx =>
+ val buildOutputMatches =
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+ ctx.buildOutputSet.subsetOf(childOutputSet)
+ buildOutputMatches
+ }
+ }
+ }
+
+ joinContextOpt match {
+ case Some(joinContext) =>
+ // We have join context information - build hash table on driver
+ logInfo(
+ s"Building hash table on driver in BroadcastExchangeExec " +
+ s"with join context: $joinContext")
+
+ // Create a broadcast ID for this hash table
+ val broadcastId =
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+ // Convert Spark JoinType to Substrait JoinType
+ val substraitJoinType = joinContext.joinType match {
+ case _: InnerLike =>
+ JoinRel.JoinType.JOIN_TYPE_INNER
+ case FullOuter =>
+ JoinRel.JoinType.JOIN_TYPE_OUTER
+ case LeftOuter |
+ RightOuter =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT
+ case LeftSemi |
+ ExistenceJoin(_) =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
+ case LeftAnti =>
+ JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+ case _ =>
+ JoinRel.JoinType.UNRECOGNIZED
+ }
+
+ // Extract filter information from join condition
+ val (filterBuildColumns, filterPropagatesNulls,
hasMixedFiltCondition) =
+ joinContext.condition match {
+ case Some(cond) =>
+ val buildAttrs = joinContext.buildOutputSet
+ val cols: Array[String] = cond.references.toSeq.collect {
+ case a: Attribute if buildAttrs.contains(a) =>
+ ConverterUtils.genColumnNameWithExprId(a)
+ }.toArray
+ val propagatesNulls =
SparkShimLoader.getSparkShims.isNullIntolerant(cond)
+ (cols, propagatesNulls, true)
+ case None =>
+ (Array.empty[String], false, false)
+ }
+
+ // Calculate bloom filter pushdown size if enabled
+ val bloomFilterPushdownSize = if
(VeloxConfig.get.hashProbeDynamicFilterPushdownEnabled) {
+ VeloxConfig.get.hashProbeBloomFilterPushdownMaxSize
+ } else {
+ -1
+ }
+
+ // Use the join keys from the matched context
+ // Since we already matched the context by comparing outputs,
+ // we know this is the correct one
+ val joinKeys = if (joinContext.buildRight) {
+ joinContext.originalRightKeys
+ } else {
+ joinContext.originalLeftKeys
+ }
+ val buildContext = BroadcastHashJoinContext(
+ buildSideJoinKeys = if (newBuildKeys.nonEmpty) newBuildKeys else
joinKeys,
+ substraitJoinType = substraitJoinType,
+ buildRight = joinContext.buildRight,
+ hasMixedFiltCondition = hasMixedFiltCondition,
+ isExistenceJoin = joinContext.joinType
+ .isInstanceOf[ExistenceJoin],
+ buildSideStructure = newOutput,
+ filterBuildColumns = filterBuildColumns,
+ filterPropagatesNulls = filterPropagatesNulls,
+ buildHashTableId = broadcastId,
+ isNullAwareAntiJoin = joinContext.isNullAwareAntiJoin,
+ bloomFilterPushdownSize = bloomFilterPushdownSize,
+ buildHashTableTimeMetric = Option(buildHashTableTimeMetric),
+ serializeHashTableTimeMetric =
Option(serializeHashTableTimeMetric),
+ serializedHashTableSizeMetric =
Option(serializedHashTableSizeMetric)
+ )
+
+ try {
+ // Build and serialize hash table on driver
+ val (serializedHashTable, safeMode) = columnarRelation match {
+ case rel: ColumnarBuildSideRelation =>
+ (
+ VeloxBroadcastBuildSideCache
+ .buildAndSerializeOnDriverInBroadcastExchange(
+ rel,
+ buildContext,
+ buildSideRowCount),
+ rel.safeBroadcastMode
+ )
+ case rel: UnsafeColumnarBuildSideRelation =>
+ (
+ VeloxBroadcastBuildSideCache
+ .buildAndSerializeOnDriverInBroadcastExchange(
+ rel,
+ buildContext,
+ buildSideRowCount),
+ rel.getSafeBroadcastMode
+ )
+ }
+
+ logInfo(
+ s"Successfully built hash table on driver: " +
+ s"size=${serializedHashTable.sizeInBytes} bytes, " +
+ s"rows=${serializedHashTable.numRows}, " +
+ s"joinType=${joinContext.joinType}, " +
+ s"broadcastId=$broadcastId")
+
+ // Return SerializedHashTableBroadcastRelation
+ SerializedHashTableBroadcastRelation(
+ serializedHashTable,
+ safeMode,
+ newOutput,
+ 0L, // buildTimeMs - tracked inside SerializedBroadcastHashTable
+ 0L // serializeTimeMs - tracked inside
SerializedBroadcastHashTable
Review Comment:
The comments say `buildTimeMs` / `serializeTimeMs` are tracked inside
`SerializedBroadcastHashTable`, but that class currently doesn’t record these
timings. This is misleading (and the fields are always set to 0 here). Either
populate the fields with real timings or update the comments to reflect reality.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+ var serializedData: UnsafeByteArray,
+ var numRows: Long,
+ var ignoreNullKeys: Boolean,
+ var joinHasNullKeys: Boolean,
+ var bloomFilterBlocksByteSize: Long,
+ var hashProbeDynamicFiltersProduced: Long,
+ var buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ serializedData.writeExternal(out)
+ out.writeObject(buildSideRelation)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ numRows = in.readLong()
+ ignoreNullKeys = in.readBoolean()
+ joinHasNullKeys = in.readBoolean()
+ bloomFilterBlocksByteSize = in.readLong()
+ hashProbeDynamicFiltersProduced = in.readLong()
+ val data = new UnsafeByteArray()
+ data.readExternal(in)
+ serializedData = data
+ buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation]
+ }
+
+ /**
+ * Deserialize the hash table on executor side. The serialized Velox hash
table is already in a
+ * prepared, probe-ready form, so executor side only needs deserialization
without re-running
+ * prepareJoinTable.
+ *
+ * @return
+ * Hash table builder handle
+ */
+ def deserialize(): Long = {
+ HashJoinBuilder.deserializeHashTableDirect(
+ serializedData.address(),
+ Math.toIntExact(serializedData.size()),
+ ignoreNullKeys,
+ joinHasNullKeys)
+ }
+
+ /** Get the size of serialized data in bytes. */
+ def sizeInBytes: Long = serializedData.size()
+}
+
+object SerializedBroadcastHashTable {
+ def apply(
+ serializedData: UnsafeByteArray,
+ numRows: Long,
+ ignoreNullKeys: Boolean,
+ joinHasNullKeys: Boolean,
+ bloomFilterBlocksByteSize: Long,
+ hashProbeDynamicFiltersProduced: Long,
+ buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable =
+ new SerializedBroadcastHashTable(
+ serializedData,
+ numRows,
+ ignoreNullKeys,
+ joinHasNullKeys,
+ bloomFilterBlocksByteSize,
+ hashProbeDynamicFiltersProduced,
+ buildSideRelation)
+
+ /**
+ * Build and serialize a hash table on the driver.
+ *
+ * @param hashTableHandle
+ * Handle to the built hash table
+ * @param buildSideRelation
+ * The build side relation for metadata
+ * @return
+ * Serialized broadcast hash table
+ */
+ def fromHashTable(
+ hashTableHandle: Long,
+ buildSideRelation: BuildSideRelation,
+ numRows: Long): SerializedBroadcastHashTable = {
+ try {
+ val serializedSize =
HashJoinBuilder.serializedHashTableSizeDirect(hashTableHandle)
+ val byteBuffer = JniUnsafeByteBuffer.allocate(serializedSize)
+ HashJoinBuilder.serializeHashTableDirect(
+ hashTableHandle,
+ byteBuffer.address(),
+ byteBuffer.size())
+ val serializedData = byteBuffer.toUnsafeByteArray()
+ val ignoreNullKeys = HashJoinBuilder
+ .getHashTableIgnoreNullKeys(hashTableHandle)
+ val joinHasNullKeys = HashJoinBuilder
+ .getHashTableJoinHasNullKeys(hashTableHandle)
+
+ val bloomFilterBlocksByteSize = HashJoinBuilder
+ .getHashTableBloomFilterBlocksByteSize(hashTableHandle)
+ val hashProbeDynamicFiltersProduced = if (bloomFilterBlocksByteSize > 0)
1L else 0L
+
Review Comment:
`hashProbeDynamicFiltersProduced` is derived as `1` whenever *any* bloom
filter blocks exist. For multi-key joins, the number of produced dynamic
filters can be > 1 (one per hasher/key), so this under-reports the metric.
Consider exposing an explicit `numDynamicFiltersProduced` (or bloom-filter
count) from native via JNI and storing the exact value here.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/SerializedHashTableBroadcastRelation.scala:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.SerializedBroadcastHashTable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.KnownSizeEstimation
+
+/**
+ * Broadcast relation that contains a pre-built and serialized hash table.
This is similar to
+ * Spark's native HashedRelation broadcast approach where the hash table is
built once on the driver
+ * and broadcast to executors.
+ *
+ * Unlike ColumnarBuildSideRelation which broadcasts raw data and builds hash
table on each
+ * executor, this class broadcasts the serialized hash table directly, saving
CPU time on executors.
+ *
+ * @param serializedHashTable
+ * The serialized hash table built on driver
+ * @param mode
+ * The broadcast mode (HashedRelationBroadcastMode or IdentityBroadcastMode)
+ * @param output
+ * The output attributes
+ * @param buildTimeMs
+ * Time spent building hash table on driver (milliseconds)
+ * @param serializeTimeMs
+ * Time spent serializing hash table on driver (milliseconds)
+ */
+case class SerializedHashTableBroadcastRelation(
+ serializedHashTable: SerializedBroadcastHashTable,
+ safeBroadcastMode: SafeBroadcastMode,
+ output: Seq[Attribute],
+ buildTimeMs: Long,
+ serializeTimeMs: Long)
+ extends BuildSideRelation
+ with KnownSizeEstimation {
+
+ // Rebuild the real BroadcastMode on demand; never serialize it.
+ @transient override lazy val mode: BroadcastMode =
+ BroadcastModeUtils.fromSafe(safeBroadcastMode, output)
+
+ /**
+ * Returns an iterator of deserialized columnar batches. Note: This is not
the primary use case
+ * for this class. The main purpose is to provide the serialized hash table
directly to the join
+ * operator.
+ */
+ override def deserialized: Iterator[ColumnarBatch] = {
+ serializedHashTable.buildSideRelation match {
+ case _: SerializedHashTableBroadcastRelation =>
+ throw new IllegalStateException(
+ "Unexpected nested SerializedHashTableBroadcastRelation in
SerializedBroadcastHashTable")
+ case other =>
+ other.deserialized
+ }
+ }
+
+ override def asReadOnlyCopy(): SerializedHashTableBroadcastRelation = this
+
+ /**
+ * Get the serialized hash table for use in join operations. This is the
primary interface for
+ * consuming this broadcast relation.
+ */
+ def getSerializedHashTable: SerializedBroadcastHashTable =
serializedHashTable
+
+ /**
+ * Transform is used for DPP (Dynamic Partition Pruning) to extract keys. We
delegate to the
+ * underlying buildSideRelation in the serialized hash table.
+ */
+ override def transform(key: Expression): Array[InternalRow] = {
+ serializedHashTable.buildSideRelation.transform(key)
+ }
Review Comment:
After avoiding serialization of `buildSideRelation` (to prevent
double-broadcast), executors will see `serializedHashTable.buildSideRelation ==
null`. The current `deserialized` / `transform` implementations will NPE in
that case. These methods should fail fast with a clear error (or implement an
executor-safe fallback).
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala:
##########
@@ -715,7 +724,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"time of loading lazy vectors"),
"buildHashTableTime" -> SQLMetrics.createTimingMetric(
sparkContext,
- "time to build hash table")
+ "time to build hash table"),
+ "deserializeHashTableTime" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "time to deserialize hash table")
Review Comment:
`BroadcastHashJoinExecTransformer` now references join metrics
`serializeHashTableTime` and `serializedHashTableSize`, but
`genHashJoinTransformerMetrics` doesn’t define them. As a result, those metric
updates become silent no-ops and users won’t see these values in the UI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]