rui-mo commented on code in PR #11392:
URL:
https://github.com/apache/incubator-gluten/pull/11392#discussion_r2736360924
##########
backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala:
##########
@@ -307,4 +307,75 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
runQueryAndCompare(q5) { _ => }
}
}
+
+ test("Hash probe dynamic filter pushdown") {
+ withSQLConf(
+ VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
+ VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
+ ) {
+ withTable("probe_table", "build_table") {
+ spark.sql("""
+ CREATE TABLE probe_table USING PARQUET
+ AS SELECT id as a FROM range(20000)
+ """)
+
+ spark.sql("""
+ CREATE TABLE build_table USING PARQUET
+ AS SELECT id * 1000 as b FROM range(10001)
+ """)
+
+ runQueryAndCompare(
+ "SELECT a FROM probe_table JOIN build_table ON a = b"
+ ) {
+ df =>
+ val join = find(df.queryExecution.executedPlan) {
+ case _: ShuffledHashJoinExecTransformer => true
+ case _ => false
+ }
+ assert(join.isDefined)
+ val metrics = join.get.metrics
+
+ assert(metrics.contains("hashProbeDynamicFiltersProduced"))
+ assert(metrics("hashProbeDynamicFiltersProduced").value > 0)
+
+ assert(metrics.contains("hashProbeReplacedWithDynamicFilterRows"))
+ assert(metrics("hashProbeReplacedWithDynamicFilterRows").value > 0)
+ }
+ }
+ }
Review Comment:
@infvg I made a few changes to the test above and confirmed that bloom
filter pushdown is working by printing the following metrics in Velox.
```
bloomFilter->blocksByteSize(): 144704
numFiltersProduced: 1
```
A good next step would be to pass the `bloomFilter->blocksByteSize()` metric
from Velox to Gluten (see `WholeStageResultIterator::collectMetrics()`), and
then verify in this test that both `blocksByteSize` and `numFiltersProduced`
are greater than 0.
This is the test with my modifications:
```
withSQLConf(
VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
) {
withTable("probe_table", "build_table") {
spark.sql("""
CREATE TABLE probe_table USING PARQUET
AS SELECT id as a FROM range(110001)
""")
spark.sql("""
CREATE TABLE build_table USING PARQUET
AS SELECT id * 1000 as b FROM range(220002)
""")
runQueryAndCompare(
"SELECT a FROM probe_table JOIN build_table ON a = b"
) {
df =>
val join = find(df.queryExecution.executedPlan) {
case _: BroadcastHashJoinExecTransformer => true
case _ => false
}
assert(join.isDefined)
val metrics = join.get.metrics
// TODO: assert the relevant metrics.
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]