Interesting analysis. Can you log a JIRA ?
> On Apr 21, 2016, at 11:07 AM, atootoonchian <a...@levyx.com> wrote: > > SQL query planner can have intelligence to push down filter commands towards > the storage layer. If we optimize the query planner such that the IO to the > storage is reduced at the cost of running multiple filters (i.e., compute), > this should be desirable when the system is IO bound. An example to prove > the case in point is below from TPCH test bench: > > Let’s look at query q19 of TPCH test bench. > select > sum(l_extendedprice* (1 - l_discount)) as revenue > from lineitem, part > where > ( p_partkey = l_partkey > and p_brand = 'Brand#12' > and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') > and l_quantity >= 1 and l_quantity <= 1 + 10 > and p_size between 1 and 5 > and l_shipmode in ('AIR', 'AIR REG') > and l_shipinstruct = 'DELIVER IN PERSON') > or > ( p_partkey = l_partkey > and p_brand = 'Brand#23' > and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') > and l_quantity >= 10 and l_quantity <= 10 + 10 > and p_size between 1 and 10 > and l_shipmode in ('AIR', 'AIR REG') > and l_shipinstruct = 'DELIVER IN PERSON') > or > ( p_partkey = l_partkey > and p_brand = 'Brand#34' > and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') > and l_quantity >= 20 and l_quantity <= 20 + 10 > and p_size between 1 and 15 > and l_shipmode in ('AIR', 'AIR REG') > and l_shipinstruct = 'DELIVER IN PERSON') > > Latest version of Spark creates a following planner (not exactly, more > readable planner) to execute q19. > Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount)) > Project [l_extendedprice,l_discount] > Join Inner, Some(((p_partkey = l_partkey) && > (((((( > (p_brand = Brand#12) && > p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && > (l_quantity >= 1.0)) && (l_quantity <= 11.0)) && > (p_size <= 5)) || > (((((p_brand = Brand#23) && > p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && > (l_quantity >= 10.0)) && (l_quantity <= 20.0)) && > (p_size <= 10))) || > (((((p_brand = Brand#34) && > p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && > (l_quantity >= 20.0)) && (l_quantity <= 30.0)) && > (p_size <= 15))))) > Project [l_partkey, l_quantity, l_extendedprice, l_discount] > Filter ((isnotnull(l_partkey) && > (isnotnull(l_shipinstruct) && > (l_shipmode IN (AIR,AIR REG) && > (l_shipinstruct = DELIVER IN PERSON)))) > LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber, > l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, > l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, > l_comment], MapPartitionsRDD[316] > Project [p_partkey, p_brand, p_size, p_container] > Filter ((isnotnull(p_partkey) && > (isnotnull(p_size) && > (cast(cast(p_size as decimal(20,0)) as int) >= 1))) > LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, > p_container, p_retailprice, p_comment], MapPartitionsRDD[314] > > As you see only three filter commands are pushed before join process is > executed. > l_shipmode IN (AIR,AIR REG) > l_shipinstruct = DELIVER IN PERSON > (cast(cast(p_size as decimal(20,0)) as int) >= 1) > > And the following filters are applied during the join process > p_brand = Brand#12 > p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) > l_quantity >= 1.0 && l_quantity <= 11.0 > p_size <= 5 > p_brand = Brand#23 > p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) > l_quantity >= 10.0 && l_quantity <= 20.0 > p_size <= 10 > p_brand = Brand#34 > p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) > l_quantity >= 20.0 && l_quantity <= 30.0 > p_size <= 15 > > Let’s look at the following sequence of SQL commands which produce same > result. > val partDfFilter = sqlContext.sql(""" > |select p_brand, p_partkey from part > |where > | (p_brand = 'Brand#12' > | and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') > | and p_size between 1 and 5) > | or > | (p_brand = 'Brand#23' > | and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') > | and p_size between 1 and 10) > | or > | (p_brand = 'Brand#34' > | and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') > | and p_size between 1 and 15) > """.stripMargin) > > val itemLineDfFilter = sqlContext.sql(""" > |select > | l_partkey, l_quantity, l_extendedprice, l_discount from lineitem > |where > | (l_quantity >= 1 and l_quantity <= 30 > | and l_shipmode in ('AIR', 'AIR REG') > | and l_shipinstruct = 'DELIVER IN PERSON') > """.stripMargin) > > partDfFilter.registerTempTable("partFilter") > itemLineDfFilter.registerTempTable("lineitemFilter") > > var q19Query = """ > |select > | sum(l_extendedprice* (1 - l_discount)) as revenue > |from > | lineitemFilter, > | partFilter > |where > | (p_partkey = l_partkey > | and p_brand = 'Brand#12' > | and l_quantity >= 1 and l_quantity <= 1 + 10) > | or > | ( p_partkey = l_partkey > | and p_brand = 'Brand#23' > | and l_quantity >= 10 and l_quantity <= 10 + 10) > | or > | ( p_partkey = l_partkey > | and p_brand = 'Brand#34' > | and l_quantity >= 20 and l_quantity <= 20 + 10) > """.stripMargin > > And as following planner shows how spark will execute new q19 query. > Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount)) > Project [l_extendedprice,l_discount] > Join Inner, Some(((p_partkey = l_partkey) && > (((((p_brand = Brand#12) && > (l_quantity >= 1.0)) && (l_quantity <= 11.0)) || > (((p_brand = Brand#23) && > (l_quantity >= 10.0)) && (l_quantity <= 20.0))) || > (((p_brand = Brand#34) && > (l_quantity >= 20.0)) && (l_quantity <= 30.0))))) > Project [l_partkey, l_quantity, l_extendedprice, l_discount] > Filter ((isnotnull(l_partkey) && > ((isnotnull(l_shipinstruct) && > isnotnull(l_quantity)) && > (((cast(l_quantity as float) >= 1.0) && > (cast(l_quantity as float) <= 30.0)) && > (l_shipmode IN (AIR,AIR REG) && > (l_shipinstruct = DELIVER IN PERSON))))) > LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber, > l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, > l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, > l_comment], MapPartitionsRDD[316] > Project [p_partkey, p_brand, p_size, p_container] > Filter ((isnotnull(p_partkey) && > isnotnull(cast(cast(p_partkey as decimal(20,0)) as int))) && > (isnotnull(p_size) && > ((cast(cast(p_size as decimal(20,0)) as int) >= 1) && > (((((p_brand = Brand#12) && > p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && > (cast(cast(p_size as decimal(20,0)) as int) <= 5)) || > (((p_brand = Brand#23) && > p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && > (cast(cast(p_size as decimal(20,0)) as int) <= 10))) || > (((p_brand = Brand#34) && > p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && > (cast(cast(p_size as decimal(20,0)) as int) <= 15)))))) > LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, > p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314] > > With new approach all filter commands is pushed down beyond join process > l_shipmode IN (AIR,AIR REG) > l_shipinstruct = DELIVER IN PERSON > cast(cast(p_size as decimal(20,0)) as int) >= 1) > p_brand = Brand#12 > p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) > l_quantity >= 1.0 && l_quantity <= 11.0 > p_size <= 5 > p_brand = Brand#23 > p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) > l_quantity >= 10.0 && l_quantity <= 20.0 > p_size <= 10 > p_brand = Brand#34 > p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) > l_quantity >= 20.0 && l_quantity <= 30.0 > p_size <= 15 > > But still some filter commands needs to be executed during join process to > distinguish different sets of items. In other words some filter commands are > re-evaluated. > p_brand = Brand#12 > l_quantity >= 1.0 && l_quantity <= 11.0 > p_brand = Brand#23 > l_quantity >= 10.0 && l_quantity <= 20.0 > p_brand = Brand#34 > l_quantity#807 >= 20.0 && l_quantity#807 <= 30.0 > > Our main goal to push down filter as much as possible is to minimize I/O and > maximize processor utilization. So let’s compare result of original q19 and > modified q19 from I/O point of view. > > +--------+--------+---------------------------------------------+--------------------------------------------+ > | TPCH | Stage | Q19 | > Q19 modified | > | Scale | > +----------+---------------+----------------+----------+----------------+---------------+ > > | Factor | | Input | Shuffle Read | Shuffle Write | Input > | Shuffle Read | Shuffle Write | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 1 | 1 | 724 MB | | 4.2 MB | > 724 MB | | 2.7 MB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 1 | 2 | 23.0 MB | | 4.0 MB | > 23.0 MB | | 22.9 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 1 | 3 | | 8.2 MB | 11.0 KB | > > | 2.7 MB | 11.0 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 1 | 4 | | 11.0 KB | > | | 11.0 KB | | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 10 | 1 | 7.2 GB | | 43.5 MB | > 7.2 GB | | 28.0 MB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 10 | 2 | 232 MB | | 39.1 MB | > 232 MB | | 146.2 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 10 | 3 | | 82.5 MB | 11.0 KB | > > | 28.1 MB | 11.0 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 10 | 4 | | 11.0 KB | > | | 11.0 KB | | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 100 | 1 | 74.1 GB | | 448 MB | > 74.1 GB | | 266 MB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 100 | 2 | 2.3 GB | | 385 MB | > 2.3 GB | | 1570 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 100 | 3 | | 834 MB | 11.0 KB | > > | 288 MB | 11.0 KB | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > | 100 | 4 | | 11.0 KB | > | | 11.0 KB | | > +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ > > As rate of read and write amplification reduction for each scale factor is > shown in the following table. > +--------------------+--------------------------+------------------------------+--------+ > > | TPCH Scale Facto | Q19 Shuffle Data | Q19 Modified Shuffle Data | > Rate | > +--------------------+--------------------------+------------------------------+--------+ > > | 1 | 8.211 MB | 2.733 MB > > | 3.00 | > +--------------------+--------------------------+------------------------------+--------+ > > | 10 | 82.611 MB | 28.157 MB > > | 2.93 | > +--------------------+--------------------------+------------------------------+--------+ > > | 100 | 834.311 MB | 288.081 MB > > | 2.89 | > +--------------------+--------------------------+------------------------------+--------+ > So as you see shuffle read and write amplification can be reduced by factor > of 3 if we can push more intelligent toward of storage. > > > > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297.html > Sent from the Apache Spark Developers List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org