zhztheplayer commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3427212313


##########
cpp/velox/jni/VeloxJniWrapper.cc:
##########
@@ -69,6 +69,11 @@ jmethodID blockStripesConstructor;
 jclass batchWriteMetricsClass;
 jmethodID batchWriteMetricsConstructor;
 
+jclass jniUnsafeByteBufferClass;
+jmethodID jniUnsafeByteBufferAllocate;
+jmethodID jniUnsafeByteBufferAddress;
+jmethodID jniUnsafeByteBufferSize;
+

Review Comment:
   nit: Why not to create the byte buffer from Java side and pass the address / 
size over JNI?



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -75,6 +97,107 @@ object VeloxBroadcastBuildSideCache
       )
   }
 
+  /**
+   * Build hash table on driver and serialize for broadcasting. This version 
is called from
+   * BroadcastExchangeExec and doesn't need a broadcast variable.
+   *
+   * This is the Spark-native approach where hash table is built in 
BroadcastExchangeExec.
+   */
+  def buildAndSerializeOnDriverInBroadcastExchange(
+      relation: BuildSideRelation,
+      broadcastContext: BroadcastHashJoinContext,
+      numRows: Long): SerializedBroadcastHashTable = {
+
+    val broadcastId = broadcastContext.buildHashTableId
+
+    val cached = driverSerializedCache.getIfPresent(broadcastId)
+    if (cached != null) {
+      logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId")
+      return cached
+    }
+
+    def resetRelation(): Unit = relation match {
+      case r: ColumnarBuildSideRelation => r.reset()
+      case r: UnsafeColumnarBuildSideRelation => r.reset()
+      case _ =>
+    }
+
+    relation.synchronized {
+      val cachedAfterLock = driverSerializedCache.getIfPresent(broadcastId)
+      if (cachedAfterLock != null) {
+        logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId (after lock)")
+        return cachedAfterLock
+      }
+
+      logInfo(
+        s"Building hash table on driver in BroadcastExchangeExec " +
+          s"for broadcast ID: $broadcastId")
+
+      val backendName = BackendsApiManager.getBackendName
+
+      val runtime = org.apache.gluten.runtime.Runtime.createStandalone(
+        backendName,
+        "DriverBroadcastHashTableBuild"
+      )

Review Comment:
   See whether we can use `TaskResources.runUnsafe` instead, example:
   
   
https://github.com/apache/gluten/blob/223fd820b6d74962fd9e9d3fef75ba36acc9474a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala#L116-L122
   
   



##########
docs/velox-configuration.md:
##########
@@ -26,7 +26,8 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | ⚓ Static      | async             | GPU RMM memory resource.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                    
                 |
 | spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 🔄 Dynamic    | 1028MB            | Maximum bytes to prefetch in CPU 
memory during GPU shuffle read while waiting for GPU available.                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                         
                 |
 | spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | ⚓ Static      | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                 |
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | 🔄 Dynamic    | true              | Enable validation fallback for 
TimestampNTZ type. When true (default), any plan containing TimestampNTZ will 
fall back to Spark execution. Set to false during development/testing of 
TimestampNTZ support to allow native execution.                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                 |
+| spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild    
    | 🔄 Dynamic    | true              | Enable driver-side broadcast hash 
table build. When enabled, the hash table is built and serialized on the 
driver, then broadcast to executors. When disabled, each executor builds its 
own hash table from the broadcast data.                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  
                 |
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | 🔄 Dynamic    | false             | Enable validation fallback for 
TimestampNTZ type. When true, any plan containing TimestampNTZ will fall back 
to Spark execution. When false, allows native execution for TimestampNTZ scan.  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             
                 |

Review Comment:
   @JkSelf this seems a valid point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to