rui-mo commented on code in PR #11392:
URL: 
https://github.com/apache/incubator-gluten/pull/11392#discussion_r2736360924


##########
backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala:
##########
@@ -307,4 +307,75 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
       runQueryAndCompare(q5) { _ => }
     }
   }
+
+  test("Hash probe dynamic filter pushdown") {
+    withSQLConf(
+      VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
+    ) {
+      withTable("probe_table", "build_table") {
+        spark.sql("""
+        CREATE TABLE probe_table USING PARQUET
+        AS SELECT id as a FROM range(20000)
+      """)
+
+        spark.sql("""
+        CREATE TABLE build_table USING PARQUET
+        AS SELECT id * 1000 as b FROM range(10001)
+      """)
+
+        runQueryAndCompare(
+          "SELECT a FROM probe_table JOIN build_table ON a = b"
+        ) {
+          df =>
+            val join = find(df.queryExecution.executedPlan) {
+              case _: ShuffledHashJoinExecTransformer => true
+              case _ => false
+            }
+            assert(join.isDefined)
+            val metrics = join.get.metrics
+
+            assert(metrics.contains("hashProbeDynamicFiltersProduced"))
+            assert(metrics("hashProbeDynamicFiltersProduced").value > 0)
+
+            assert(metrics.contains("hashProbeReplacedWithDynamicFilterRows"))
+            assert(metrics("hashProbeReplacedWithDynamicFilterRows").value > 0)
+        }
+      }
+    }

Review Comment:
   @infvg I made a few changes to the test above and confirmed that bloom 
filter pushdown is working by printing the following metrics in Velox.
   
   ```
   bloomFilter->blocksByteSize(): 144704
   numFiltersProduced: 1
   ```
   
   A good next step would be to pass the `bloomFilter->blocksByteSize()` metric 
from Velox to Gluten (see `WholeStageResultIterator::collectMetrics()`), and 
then verify in this test that both `blocksByteSize` and `numFiltersProduced` 
are greater than 0.
   
   This is the test with my modifications: 
   ```
   withSQLConf(
         VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
         VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
       ) {
         withTable("probe_table", "build_table") {
           spark.sql("""
           CREATE TABLE probe_table USING PARQUET
           AS SELECT id as a FROM range(110001)
         """)
   
           spark.sql("""
           CREATE TABLE build_table USING PARQUET
           AS SELECT id * 1000 as b FROM range(220002)
         """)
   
           runQueryAndCompare(
             "SELECT a FROM probe_table JOIN build_table ON a = b"
           ) {
             df =>
               val join = find(df.queryExecution.executedPlan) {
                 case _: BroadcastHashJoinExecTransformer => true
                 case _ => false
               }
               assert(join.isDefined)
               val metrics = join.get.metrics
               
                // TODO: assert the relevant metrics.
           }
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to