Interesting analysis. 

Can you log a JIRA ?

> On Apr 21, 2016, at 11:07 AM, atootoonchian <a...@levyx.com> wrote:
> 
> SQL query planner can have intelligence to push down filter commands towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound. An example to prove
> the case in point is below from TPCH test bench:
> 
> Let’s look at query q19 of TPCH test bench.
> select
>    sum(l_extendedprice* (1 - l_discount)) as revenue
> from lineitem, part
> where
>      ( p_partkey = l_partkey
>        and p_brand = 'Brand#12'
>        and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>        and l_quantity >= 1 and l_quantity <= 1 + 10
>        and p_size between 1 and 5
>        and l_shipmode in ('AIR', 'AIR REG')
>        and l_shipinstruct = 'DELIVER IN PERSON')
>      or
>      ( p_partkey = l_partkey
>        and p_brand = 'Brand#23'
>        and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
>        and l_quantity >= 10 and l_quantity <= 10 + 10
>        and p_size between 1 and 10
>        and l_shipmode in ('AIR', 'AIR REG')
>        and l_shipinstruct = 'DELIVER IN PERSON')
>      or
>      ( p_partkey = l_partkey
>        and p_brand = 'Brand#34'
>        and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>        and l_quantity >= 20 and l_quantity <= 20 + 10
>        and p_size between 1 and 15
>        and l_shipmode in ('AIR', 'AIR REG')
>        and l_shipinstruct = 'DELIVER IN PERSON')
> 
> Latest version of Spark creates a following planner (not exactly, more
> readable planner) to execute q19.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>  Project [l_extendedprice,l_discount]
>    Join Inner, Some(((p_partkey = l_partkey) && 
> ((((((
>   (p_brand = Brand#12) && 
>    p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
>   (l_quantity >= 1.0)) && (l_quantity <= 11.0)) && 
>   (p_size <= 5)) || 
> (((((p_brand = Brand#23) && 
>     p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && 
>    (l_quantity >= 10.0)) && (l_quantity <= 20.0)) && 
>    (p_size <= 10))) || 
> (((((p_brand = Brand#34) && 
>     p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
>    (l_quantity >= 20.0)) && (l_quantity <= 30.0)) && 
>    (p_size <= 15)))))
>      Project [l_partkey, l_quantity, l_extendedprice, l_discount]
>        Filter ((isnotnull(l_partkey) && 
>                (isnotnull(l_shipinstruct) && 
>                (l_shipmode IN (AIR,AIR REG) && 
>                (l_shipinstruct = DELIVER IN PERSON))))
>          LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316] 
>      Project [p_partkey, p_brand, p_size, p_container]
>        Filter ((isnotnull(p_partkey) && 
>    (isnotnull(p_size) && 
>    (cast(cast(p_size as decimal(20,0)) as int) >= 1)))
>          LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
> p_container, p_retailprice, p_comment], MapPartitionsRDD[314]    
> 
> As you see only three filter commands are pushed before join process is
> executed.
>  l_shipmode IN (AIR,AIR REG)
>  l_shipinstruct = DELIVER IN PERSON
>  (cast(cast(p_size as decimal(20,0)) as int) >= 1)
> 
> And the following filters are applied during the join process
>  p_brand = Brand#12
>  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) 
>  l_quantity >= 1.0 && l_quantity <= 11.0 
>  p_size <= 5  
>  p_brand = Brand#23 
>  p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) 
>  l_quantity >= 10.0 && l_quantity <= 20.0 
>  p_size <= 10 
>  p_brand = Brand#34 
>  p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) 
>  l_quantity >= 20.0 && l_quantity <= 30.0
>  p_size <= 15
> 
> Let’s look at the following sequence of SQL commands which produce same
> result.
> val partDfFilter = sqlContext.sql("""
>        |select p_brand, p_partkey from part 
>        |where
>        | (p_brand = 'Brand#12'
>        |   and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>        |   and p_size between 1 and 5)
>        | or
>        | (p_brand = 'Brand#23'
>        |   and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
>        |   and p_size between 1 and 10)
>        | or
>        | (p_brand = 'Brand#34'
>        |   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>        |   and p_size between 1 and 15)
>       """.stripMargin)
> 
> val itemLineDfFilter = sqlContext.sql("""
>        |select 
>        | l_partkey, l_quantity, l_extendedprice, l_discount from lineitem
>        |where
>        | (l_quantity >= 1 and l_quantity <= 30
>        |   and l_shipmode in ('AIR', 'AIR REG')
>        |   and l_shipinstruct = 'DELIVER IN PERSON')
>      """.stripMargin)
> 
> partDfFilter.registerTempTable("partFilter")
> itemLineDfFilter.registerTempTable("lineitemFilter")
> 
> var q19Query = """         
>        |select
>        | sum(l_extendedprice* (1 - l_discount)) as revenue
>        |from
>        | lineitemFilter,
>        | partFilter
>        |where
>        | (p_partkey = l_partkey
>        |   and p_brand = 'Brand#12'
>        |   and l_quantity >= 1 and l_quantity <= 1 + 10)
>        | or
>        | ( p_partkey = l_partkey
>        |   and p_brand = 'Brand#23'
>        |   and l_quantity >= 10 and l_quantity <= 10 + 10)
>        | or
>        | ( p_partkey = l_partkey
>        |   and p_brand = 'Brand#34'
>        |   and l_quantity >= 20 and l_quantity <= 20 + 10)
>      """.stripMargin
> 
> And as following planner shows how spark will execute new q19 query.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>  Project [l_extendedprice,l_discount]
>    Join Inner, Some(((p_partkey = l_partkey) && 
> (((((p_brand = Brand#12) && 
>    (l_quantity >= 1.0)) && (l_quantity <= 11.0)) || 
>  (((p_brand = Brand#23) && 
>    (l_quantity >= 10.0)) && (l_quantity <= 20.0))) || 
>  (((p_brand = Brand#34) && 
>    (l_quantity >= 20.0)) && (l_quantity <= 30.0)))))
>      Project [l_partkey, l_quantity, l_extendedprice, l_discount]
>        Filter ((isnotnull(l_partkey) && 
>               ((isnotnull(l_shipinstruct) && 
>                 isnotnull(l_quantity)) && 
>              (((cast(l_quantity as float) >= 1.0) && 
>                (cast(l_quantity as float) <= 30.0)) && 
>                (l_shipmode IN (AIR,AIR REG) && 
>                (l_shipinstruct = DELIVER IN PERSON)))))
>          LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316]
>            Project [p_partkey, p_brand, p_size, p_container]
>              Filter ((isnotnull(p_partkey) &&
>        isnotnull(cast(cast(p_partkey as decimal(20,0)) as int))) && 
> (isnotnull(p_size) && 
>            ((cast(cast(p_size as decimal(20,0)) as int) >= 1) &&
>            (((((p_brand = Brand#12) && 
>                 p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
>             (cast(cast(p_size as decimal(20,0)) as int) <= 5)) || 
>  (((p_brand = Brand#23) && 
>     p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) &&
>     (cast(cast(p_size as decimal(20,0)) as int) <= 10))) || 
>  (((p_brand = Brand#34) && 
>     p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
>     (cast(cast(p_size as decimal(20,0)) as int) <= 15))))))
>                  LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type,
> p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314]
> 
> With new approach all filter commands is pushed down beyond join process 
>  l_shipmode IN (AIR,AIR REG)
>  l_shipinstruct = DELIVER IN PERSON
>  cast(cast(p_size as decimal(20,0)) as int) >= 1)
>  p_brand = Brand#12
>  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)
>  l_quantity >= 1.0 && l_quantity <= 11.0 
>  p_size <= 5 
>  p_brand = Brand#23 
>  p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) 
>  l_quantity >= 10.0 && l_quantity <= 20.0 
>  p_size <= 10 
>  p_brand = Brand#34 
>  p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) 
>  l_quantity >= 20.0 && l_quantity <= 30.0
>  p_size <= 15
> 
> But still some filter commands needs to be executed during join process to
> distinguish different sets of items. In other words some filter commands are
> re-evaluated.
>  p_brand = Brand#12
>  l_quantity >= 1.0 && l_quantity <= 11.0 
>  p_brand = Brand#23 
>  l_quantity >= 10.0 && l_quantity <= 20.0 
>  p_brand = Brand#34 
>  l_quantity#807 >= 20.0 && l_quantity#807 <= 30.0
> 
> Our main goal to push down filter as much as possible is to minimize I/O and
> maximize processor utilization. So let’s compare result of original q19 and
> modified q19 from I/O point of view.
> 
> +--------+--------+---------------------------------------------+--------------------------------------------+
> | TPCH   | Stage  | Q19                                                    |
> Q19 modified                                      | 
> | Scale   |          
> +----------+---------------+----------------+----------+----------------+---------------+
>  
> | Factor  |           | Input      | Shuffle Read | Shuffle Write  | Input    
> | Shuffle Read  | Shuffle Write | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 1         | 1         | 724 MB  |                    | 4.2 MB           |
> 724 MB  |                      | 2.7 MB         | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 1         | 2         | 23.0 MB |                    | 4.0 MB           |
> 23.0 MB |                      | 22.9 KB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 1         | 3         |              | 8.2 MB         | 11.0 KB         |   
>           
> | 2.7 MB           | 11.0 KB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 1         | 4         |              | 11.0 KB        |                    
> |               | 11.0 KB        |                     | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 10       | 1         | 7.2 GB   |                     | 43.5 MB        |
> 7.2 GB    |                     | 28.0 MB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 10       | 2         | 232 MB  |                     | 39.1 MB        |
> 232 MB   |                     | 146.2 KB       | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 10       | 3         |              | 82.5 MB        | 11.0 KB         |    
>          
> | 28.1 MB         | 11.0 KB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 10       | 4         |              | 11.0 KB        |                     
> |              | 11.0 KB         |                    | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 100     | 1         | 74.1 GB |                     | 448 MB          |
> 74.1 GB |                      | 266 MB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 100     | 2         | 2.3 GB   |                     | 385 MB          |
> 2.3 GB   |                      | 1570 KB       | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 100     | 3         |              | 834 MB         | 11.0 KB         |     
>         
> | 288 MB          | 11.0 KB        | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>  
> | 100     | 4         |              | 11.0 KB        |                     
> |              | 11.0 KB         |                    | 
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> 
> As rate of read and write amplification reduction for each scale factor is
> shown in the following table.
> +--------------------+--------------------------+------------------------------+--------+
>  
> | TPCH Scale Facto  | Q19 Shuffle Data         | Q19 Modified Shuffle Data |
> Rate    | 
> +--------------------+--------------------------+------------------------------+--------+
>  
> | 1                         | 8.211 MB                    | 2.733 MB          
>               
> | 3.00    | 
> +--------------------+--------------------------+------------------------------+--------+
>  
> | 10                       | 82.611 MB                  | 28.157 MB           
>            
> | 2.93    | 
> +--------------------+--------------------------+------------------------------+--------+
>  
> | 100                     | 834.311 MB                | 288.081 MB            
>          
> | 2.89    | 
> +--------------------+--------------------------+------------------------------+--------+
> So as you see shuffle read and write amplification can be reduced by factor
> of 3 if we can push more intelligent toward of storage.
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to