wangyum opened a new pull request #29065:
URL: https://github.com/apache/spark/pull/29065


   ### What changes were proposed in this pull request?
   
   We can improve the performance of some joins by pre-filtering one side of a 
join using the values from the other side of the join. In order to verify that 
Bloom Filter Join is effective, we first make some proof:
   
   1. Reduce shuffle data can improve performance
      - Can not improve performance for broadcast join after pre-filtering.
   
           | Default | pre-filter `ss_store_sk in(41, 543, 694)` |
           |:----------:|----------|
           | create table test.case1 using parquet as  SELECT s1.* FROM 
store_sales s1 join store on s_store_sk = ss_store_sk AND s_state IN ('TN') AND 
s_zip < 30534 |  create table test.case1 using parquet as SELECT s1.* FROM 
store_sales s1 join store on s_store_sk = ss_store_sk AND s_state IN ('TN') AND 
s_zip < 30534 and ss_store_sk in(41, 543, 694)
           | <img 
src="https://user-images.githubusercontent.com/5399861/87167941-4f88c080-c300-11ea-99b7-1e25a2eb5808.png";
 width="410"> |  <img 
src="https://user-images.githubusercontent.com/5399861/87167936-4d266680-c300-11ea-9e34-5d097c4cd4f0.png";
 width="410">|
   
      - Can improve performance for sort merge join after pre-filtering.
           | Default | pre-filter `ss_store_sk in(41, 543, 694)` |
           |:----------:|----------|
           | create table test.case2 using parquet as  SELECT s1.* FROM 
store_sales s1 join store on s_store_sk = ss_store_sk AND s_state IN ('TN') AND 
s_zip < 30534 |  create table test.case2 using parquet as SELECT s1.* FROM 
store_sales s1 join store on s_store_sk = ss_store_sk AND s_state IN ('TN') AND 
s_zip < 30534 and ss_store_sk in(41, 543, 694)
           | <img 
src="https://user-images.githubusercontent.com/5399861/87168933-c8d4e300-c301-11ea-9ec3-dd1019730bf1.png";
 width="410"> |  <img 
src="https://user-images.githubusercontent.com/5399861/87168936-cb373d00-c301-11ea-97d1-49a2ca2aff3e.png";
 width="410">|
   
   
   2. It is difficult to evaluate dynamic Min-Max runtime-filtering are 
effective. For example: Bloom Filter can push more data than Min-Max Filter for 
TPC-DS q95
       - Calculate `ws1.ws_ship_date_sk` Min-Max value base on`d_date BETWEEN 
'1999-02-01' AND (CAST('1999-02-01' AS DATE) + INTERVAL 60 DAY) AND 
ws1.ws_ship_date_sk = d_date_sk`
      ```sql
      spark-sql> select min(d_date_sk), max(d_date_sk) from date_dim WHERE 
d_date BETWEEN '1999-02-01' AND (CAST('1999-02-01' AS DATE) + INTERVAL 60 DAY);
      2451211   2451271
      ```
      -  Add new predicate `ws1.ws_ship_date_sk >= 2451211 AND 
ws1.ws_ship_date_sk <= 2451271` for Min-Max Filter :
           | Default |  Min-Max Filter | Bloom Filter |
           |:----------:|----------|----------|
           | <img 
src="https://user-images.githubusercontent.com/5399861/87174683-20774c80-c30a-11ea-96c1-72359aea650e.png";
 width="247"> |  <img 
src="https://user-images.githubusercontent.com/5399861/87174529-e148fb80-c309-11ea-8076-49224e856e5e.png";
 width="247"> | <img 
src="https://user-images.githubusercontent.com/5399861/87174522-dc844780-c309-11ea-9062-ec2211884003.png";
 width="247">|
   
   
   
   
   3. Evaluate dynamic Bloom Filter runtime-filtering by TPCDS.
   
   Query | Default(Seconds) | Enable Bloom Filter Join(Seconds)
   -- | -- | --
   tpcds q16 | 84 | 46
   tpcds q36 | 29 | 21
   tpcds q57 | 39 | 28
   tpcds q94 | 42 | 34
   tpcds q95 | 306 | 288
   
   
   
   TODO:
     1. `BuildBloomFilter` and `InBloomFilter` support codegen.
     2. Add a new `DynamicFilter` and `DynamicFilter` should support filter 
pushdown.
     3. BroadcastExchange reuse.
     4. Replace BloomFilter to In predicate if values less than 
spark.sql.parquet.pushdown.inFilterThreshold.
   
   ### Why are the changes needed?
   
   Improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   
   ### How was this patch tested?
    
   TODO.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to