Copilot commented on code in PR #11993: URL: https://github.com/apache/gluten/pull/11993#discussion_r3419935177
########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.{broadcast, SparkContext} Review Comment: This import/type usage is not valid Scala for referencing the org.apache.spark.broadcast package (it will not compile as written). Use the proper Broadcast type import (e.g., org.apache.spark.broadcast.Broadcast) and declare broadcasted: Broadcast[BuildSideRelation], or import the package with `import org.apache.spark.broadcast` (without braces) if you want to reference broadcast.Broadcast. ########## cpp/velox/jni/JniHashTable.cc: ########## @@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) { return JniHashTableContext::getInstance().callJavaGet(hashTableId); } +size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) { + VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null"); + + auto hashTable = builder->hashTable(); + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + auto* hashTableFalse = dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get()); + if (hashTableFalse != nullptr) { + return HashTableSerializer::serializedSize<false>(hashTableFalse); + } + + auto* hashTableTrue = dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get()); + VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either HashTable<false> or HashTable<true>"); + return HashTableSerializer::serializedSize<true>(hashTableTrue); +} + +void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* data, size_t size) { + VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null"); + VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null"); + + auto hashTable = builder->hashTable(); + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + auto* hashTableFalse = dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get()); + if (hashTableFalse != nullptr) { + HashTableSerializer::serializeTo<false>(hashTableFalse, data, size); + return; + } + + auto* hashTableTrue = dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get()); + VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either HashTable<false> or HashTable<true>"); + HashTableSerializer::serializeTo<true>(hashTableTrue, data, size); +} + +std::shared_ptr<HashTableBuilder> deserializeHashTable( + const uint8_t* data, + size_t size, + facebook::velox::memory::MemoryPool* memoryPool, + bool ignoreNullKeys, + bool joinHasNullKeys) { + VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null"); + VELOX_CHECK_GT(size, 0, "Invalid data size"); + + auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : defaultLeafVeloxMemoryPool(); + + std::unique_ptr<facebook::velox::exec::BaseHashTable> hashTable; + if (ignoreNullKeys) { + auto derived = HashTableSerializer::deserialize<true>(data, size, pool.get()); + hashTable = std::move(derived); + } else { + auto derived = HashTableSerializer::deserialize<false>(data, size, pool.get()); + hashTable = std::move(derived); + } + + std::vector<std::shared_ptr<const facebook::velox::core::FieldAccessTypedExpr>> emptyKeys; + std::vector<uint32_t> emptyChannels; + + auto keyTypes = hashTable->rows()->keyTypes(); + std::vector<std::string> names; + for (size_t i = 0; i < keyTypes.size(); ++i) { + names.push_back("key" + std::to_string(i)); + } + auto rowType = facebook::velox::ROW(std::move(names), std::move(keyTypes)); + + auto builder = std::make_shared<HashTableBuilder>( + facebook::velox::core::JoinType::kInner, + false, + false, + -1, + emptyKeys, + emptyChannels, + false, + rowType, + pool.get(), Review Comment: The deserialization path creates a local `pool` (shared_ptr) and passes `pool.get()` into HashTableBuilder, but the shared_ptr is destroyed when deserializeHashTable() returns. If HashTableBuilder stores the raw MemoryPool* (typical), this leaves a dangling pointer and can cause UAF in any later allocation/free. Ensure the memory pool has the same lifetime as the returned builder (e.g., store the shared_ptr alongside the builder in a holder saved to ObjectStore, pass an externally-owned pool from Runtime, or refactor HashTableBuilder construction to retain shared ownership). ########## backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.vectorized.HashJoinBuilder + +import org.apache.spark.sql.execution.joins.BuildSideRelation +import org.apache.spark.sql.execution.unsafe.UnsafeByteArray + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +/** + * Serialized broadcast hash table that can be efficiently broadcast to executors. This is built on + * the driver and contains the serialized hash table data. + */ +class SerializedBroadcastHashTable( + var serializedData: UnsafeByteArray, + var numRows: Long, + var ignoreNullKeys: Boolean, + var joinHasNullKeys: Boolean, + var bloomFilterBlocksByteSize: Long, + var hashProbeDynamicFiltersProduced: Long, + var buildSideRelation: BuildSideRelation) + extends Externalizable { + + def this() = this(null, 0, false, false, 0, 0, null) // Required for Externalizable + + override def writeExternal(out: ObjectOutput): Unit = { + out.writeLong(numRows) + out.writeBoolean(ignoreNullKeys) + out.writeBoolean(joinHasNullKeys) + out.writeLong(bloomFilterBlocksByteSize) + out.writeLong(hashProbeDynamicFiltersProduced) + serializedData.writeExternal(out) + out.writeObject(buildSideRelation) + } + + override def readExternal(in: ObjectInput): Unit = { + numRows = in.readLong() + ignoreNullKeys = in.readBoolean() + joinHasNullKeys = in.readBoolean() + bloomFilterBlocksByteSize = in.readLong() + hashProbeDynamicFiltersProduced = in.readLong() + val data = new UnsafeByteArray() + data.readExternal(in) + serializedData = data + buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation] + } + + /** + * Deserialize the hash table on executor side. The serialized Velox hash table is already in a + * prepared, probe-ready form, so executor side only needs deserialization without re-running + * prepareJoinTable. + * + * @return + * Hash table builder handle + */ + def deserialize(): Long = { + HashJoinBuilder.deserializeHashTableDirect( + serializedData.address(), + Math.toIntExact(serializedData.size()), + ignoreNullKeys, + joinHasNullKeys) + } + + /** Get the size of serialized data in bytes. */ + def sizeInBytes: Long = serializedData.size() +} + +object SerializedBroadcastHashTable { + def apply( + serializedData: UnsafeByteArray, + numRows: Long, + ignoreNullKeys: Boolean, + joinHasNullKeys: Boolean, + bloomFilterBlocksByteSize: Long, + hashProbeDynamicFiltersProduced: Long, + buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable = + new SerializedBroadcastHashTable( + serializedData, + numRows, + ignoreNullKeys, + joinHasNullKeys, + bloomFilterBlocksByteSize, + hashProbeDynamicFiltersProduced, + buildSideRelation) + + /** + * Build and serialize a hash table on the driver. + * + * @param hashTableHandle + * Handle to the built hash table + * @param buildSideRelation + * The build side relation for metadata + * @return + * Serialized broadcast hash table + */ + def fromHashTable( + hashTableHandle: Long, + buildSideRelation: BuildSideRelation, + numRows: Long): SerializedBroadcastHashTable = { + try { + val serializedData = HashJoinBuilder + .serializeHashTableDirect(hashTableHandle) + .toUnsafeByteArray() + val ignoreNullKeys = HashJoinBuilder + .getHashTableIgnoreNullKeys(hashTableHandle) + val joinHasNullKeys = HashJoinBuilder + .getHashTableJoinHasNullKeys(hashTableHandle) + + val bloomFilterBlocksByteSize = HashJoinBuilder + .getHashTableBloomFilterBlocksByteSize(hashTableHandle) + val hashProbeDynamicFiltersProduced = if (bloomFilterBlocksByteSize > 0) 1L else 0L Review Comment: Setting `hashProbeDynamicFiltersProduced` to `1` whenever bloom filter data exists is not a reliable measure of how many dynamic filters were produced (and conflates 'bloom filter exists' with 'filters produced'). This will make the reported metrics inaccurate. Prefer retrieving the exact count from native (add a JNI getter) or leave it unset/0 and continue accumulating the probe-side metrics even when using driver-side serialized hash tables. ########## docs/velox-configuration.md: ########## @@ -26,7 +26,8 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | ⚓ Static | async | GPU RMM memory resource. | | spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 🔄 Dynamic | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | | spark.gluten.sql.columnar.backend.velox.directorySizeGuess | ⚓ Static | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | -| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | 🔄 Dynamic | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | +| spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild | 🔄 Dynamic | true | Enable driver-side broadcast hash table build. When enabled, the hash table is built and serialized on the driver, then broadcast to executors. When disabled, each executor builds its own hash table from the broadcast data. | +| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | 🔄 Dynamic | false | Enable validation fallback for TimestampNTZ type. When true, any plan containing TimestampNTZ will fall back to Spark execution. When false, allows native execution for TimestampNTZ scan. | Review Comment: The documented default for `spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation` is changed to `false` here, but the code in VeloxConfig still defines the default as `true` (per this PR’s diffs). Please align the doc default with the actual config default (or change the code default if that was intended), since this PR’s main scope is BHJ optimization and this mismatch will mislead users. ########## cpp/velox/jni/JniHashTable.cc: ########## @@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) { return JniHashTableContext::getInstance().callJavaGet(hashTableId); } +size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) { + VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null"); + + auto hashTable = builder->hashTable(); + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + auto* hashTableFalse = dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get()); + if (hashTableFalse != nullptr) { + return HashTableSerializer::serializedSize<false>(hashTableFalse); + } + + auto* hashTableTrue = dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get()); + VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either HashTable<false> or HashTable<true>"); + return HashTableSerializer::serializedSize<true>(hashTableTrue); +} + +void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* data, size_t size) { + VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null"); + VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null"); + + auto hashTable = builder->hashTable(); + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + auto* hashTableFalse = dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get()); + if (hashTableFalse != nullptr) { + HashTableSerializer::serializeTo<false>(hashTableFalse, data, size); + return; + } + + auto* hashTableTrue = dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get()); + VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either HashTable<false> or HashTable<true>"); + HashTableSerializer::serializeTo<true>(hashTableTrue, data, size); +} + +std::shared_ptr<HashTableBuilder> deserializeHashTable( + const uint8_t* data, + size_t size, + facebook::velox::memory::MemoryPool* memoryPool, + bool ignoreNullKeys, + bool joinHasNullKeys) { + VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null"); + VELOX_CHECK_GT(size, 0, "Invalid data size"); + + auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : defaultLeafVeloxMemoryPool(); Review Comment: The deserialization path creates a local `pool` (shared_ptr) and passes `pool.get()` into HashTableBuilder, but the shared_ptr is destroyed when deserializeHashTable() returns. If HashTableBuilder stores the raw MemoryPool* (typical), this leaves a dangling pointer and can cause UAF in any later allocation/free. Ensure the memory pool has the same lifetime as the returned builder (e.g., store the shared_ptr alongside the builder in a holder saved to ObjectStore, pass an externally-owned pool from Runtime, or refactor HashTableBuilder construction to retain shared ownership). ########## backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.{broadcast, SparkContext} +import org.apache.spark.sql.execution.SerializedHashTableBroadcastRelation +import org.apache.spark.sql.execution.joins.BuildSideRelation +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * RDD for handling serialized broadcast hash tables built on the driver. This RDD deserializes the + * hash table on each executor. + */ +case class VeloxSerializedBroadcastRDD( + @transient private val sc: SparkContext, + broadcasted: broadcast.Broadcast[BuildSideRelation], Review Comment: This import/type usage is not valid Scala for referencing the org.apache.spark.broadcast package (it will not compile as written). Use the proper Broadcast type import (e.g., org.apache.spark.broadcast.Broadcast) and declare broadcasted: Broadcast[BuildSideRelation], or import the package with `import org.apache.spark.broadcast` (without braces) if you want to reference broadcast.Broadcast. ########## backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala: ########## @@ -177,9 +178,73 @@ case class BroadcastHashJoinExecTransformer( buildBroadcastTableId, isNullAwareAntiJoin, bloomFilterPushdownSize, - metrics.get("buildHashTableTime") + metrics.get("buildHashTableTime"), + metrics.get("serializeHashTableTime"), + metrics.get("deserializeHashTableTime"), + metrics.get("serializedHashTableSize") Review Comment: The join context requests metrics keys like `serializeHashTableTime` (and possibly `serializedHashTableSize`) but the Velox hash-join metrics list in this PR only adds `deserializeHashTableTime`. If these metrics are intended to be reported at the join operator level, add them to the corresponding metrics generator; otherwise remove/rename these keys to avoid permanently-None metrics wiring and confusion. ########## ep/build-velox/src/get-velox.sh: ########## @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_06_06 -VELOX_ENHANCED_BRANCH=ibm-2026_06_06 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_06_06-serialize-hashtable +VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable Review Comment: Switching the build script to a personal fork (`JkSelf/velox.git`) reduces build reproducibility and can introduce supply-chain risk for downstream users/CI. Prefer referencing an official upstream/org fork (or an internal mirror) and/or make the fork/branch configurable via environment variables while keeping the default pointed at the project’s standard Velox source. ########## cpp/velox/operators/hashjoin/HashTableBuilder.cc: ########## @@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t numDistinct) const { void HashTableBuilder::abandonHashBuildDedup() { abandonHashBuildDedup_ = true; - uniqueTable_->setAllowDuplicates(true); + // uniqueTable_->setAllowDuplicates(true); Review Comment: Commenting out `uniqueTable_->setAllowDuplicates(true)` changes behavior when abandoning dedup: the table may continue enforcing uniqueness or assume dedup invariants, which can lead to dropped build rows or incorrect join results. If dedup is abandoned, the builder should explicitly switch the underlying table to allow duplicates (or provide an equivalent safe fallback path) instead of silently skipping this step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
