[ 
https://issues.apache.org/jira/browse/SPARK-8287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14580403#comment-14580403
 ] 

Reynold Xin edited comment on SPARK-8287 at 12/17/15 6:58 AM:
--------------------------------------------------------------

Sorry [~lian cheng] , see this:

{code}
scala> sql("select * from dw.src_view where ds='2' ").queryExecution.analyzed
res258: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [s1#943L,s2#944L,ds#945]
 Filter (ds#945 = 2)
  Subquery src_view
   Aggregate [ds#949], [SUM(CAST(key#947, LongType)) AS 
s1#943L,SUM(CAST(key#950, LongType)) AS s2#944L,ds#949 AS ds#945]
    Join Inner, Some((ds#946 = ds#949))
     MetastoreRelation dw, src_partitioned1, Some(a)
     MetastoreRelation dw, src_partitioned2, Some(b)

scala> sql("select * from dw.src_view where ds='2' 
").queryExecution.optimizedPlan
15/06/10 19:15:39 INFO ParseDriver: Parsing command: select * from dw.src_view 
where ds='2'
15/06/10 19:15:39 INFO ParseDriver: Parse Completed
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_view
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_view      
15/06/10 19:15:39 INFO ParseDriver: Parsing command: select sum(`a`.`key`) 
`s1`, sum(`b`.`key`) `s2`, `b`.`ds` `ds` from `dw`.`src_partitioned1` `a` join 
`dw`.`src_partitioned2` `b` on `a`.`ds` = `b`.`ds` group by `b`.`ds`
15/06/10 19:15:39 INFO ParseDriver: Parse Completed
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_partitioned1
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_partitioned1      
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_partitions : db=dw 
tbl=src_partitioned1
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_partitions : db=dw tbl=src_partitioned1 
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_partitioned2
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_partitioned2      
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_partitions : db=dw 
tbl=src_partitioned2
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_partitions : db=dw tbl=src_partitioned2 
res257: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Filter (ds#936 = 2)
 Aggregate [ds#940], [SUM(CAST(key#938, LongType)) AS s1#934L,SUM(CAST(key#941, 
LongType)) AS s2#935L,ds#940 AS ds#936]
  Project [ds#940,key#938,key#941]
   Join Inner, Some((ds#937 = ds#940))
    Project [key#938,ds#937]
     MetastoreRelation dw, src_partitioned1, Some(a)
    Project [ds#940,key#941]
     MetastoreRelation dw, src_partitioned2, Some(b)

scala> sql("select * from dw.src_view where ds='2' 
").queryExecution.executedPlan
res259: org.apache.spark.sql.execution.SparkPlan = 
Filter (ds#954 = 2)
 Aggregate false, [ds#958], [SUM(PartialSum#963L) AS 
s1#952L,SUM(PartialSum#964L) AS s2#953L,ds#958 AS ds#954]
  Exchange (HashPartitioning [ds#958], 200)
   Aggregate true, [ds#958], [ds#958,SUM(CAST(key#956, LongType)) AS 
PartialSum#963L,SUM(CAST(key#959, LongType)) AS PartialSum#964L]
    Project [ds#958,key#956,key#959]
     ShuffledHashJoin [ds#955], [ds#958], BuildRight
      Exchange (HashPartitioning [ds#955], 200)
       HiveTableScan [key#956,ds#955], (MetastoreRelation dw, src_partitioned1, 
Some(a)), None
      Exchange (HashPartitioning [ds#958], 200)
       HiveTableScan [ds#958,key#959], (MetastoreRelation dw, src_partitioned2, 
Some(b)), None
{code}


was (Author: oopsoutofmemory):
Sorry [~lian cheng] , see this:
scala> sql("select * from dw.src_view where ds='2' ").queryExecution.analyzed
res258: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project [s1#943L,s2#944L,ds#945]
 Filter (ds#945 = 2)
  Subquery src_view
   Aggregate [ds#949], [SUM(CAST(key#947, LongType)) AS 
s1#943L,SUM(CAST(key#950, LongType)) AS s2#944L,ds#949 AS ds#945]
    Join Inner, Some((ds#946 = ds#949))
     MetastoreRelation dw, src_partitioned1, Some(a)
     MetastoreRelation dw, src_partitioned2, Some(b)

scala> sql("select * from dw.src_view where ds='2' 
").queryExecution.optimizedPlan
15/06/10 19:15:39 INFO ParseDriver: Parsing command: select * from dw.src_view 
where ds='2'
15/06/10 19:15:39 INFO ParseDriver: Parse Completed
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_view
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_view      
15/06/10 19:15:39 INFO ParseDriver: Parsing command: select sum(`a`.`key`) 
`s1`, sum(`b`.`key`) `s2`, `b`.`ds` `ds` from `dw`.`src_partitioned1` `a` join 
`dw`.`src_partitioned2` `b` on `a`.`ds` = `b`.`ds` group by `b`.`ds`
15/06/10 19:15:39 INFO ParseDriver: Parse Completed
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_partitioned1
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_partitioned1      
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_partitions : db=dw 
tbl=src_partitioned1
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_partitions : db=dw tbl=src_partitioned1 
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_table : db=dw tbl=src_partitioned2
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_table : db=dw tbl=src_partitioned2      
15/06/10 19:15:39 INFO HiveMetaStore: 0: get_partitions : db=dw 
tbl=src_partitioned2
15/06/10 19:15:39 INFO audit: ugi=shengli       ip=unknown-ip-addr      
cmd=get_partitions : db=dw tbl=src_partitioned2 
res257: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Filter (ds#936 = 2)
 Aggregate [ds#940], [SUM(CAST(key#938, LongType)) AS s1#934L,SUM(CAST(key#941, 
LongType)) AS s2#935L,ds#940 AS ds#936]
  Project [ds#940,key#938,key#941]
   Join Inner, Some((ds#937 = ds#940))
    Project [key#938,ds#937]
     MetastoreRelation dw, src_partitioned1, Some(a)
    Project [ds#940,key#941]
     MetastoreRelation dw, src_partitioned2, Some(b)

scala> sql("select * from dw.src_view where ds='2' 
").queryExecution.executedPlan
res259: org.apache.spark.sql.execution.SparkPlan = 
Filter (ds#954 = 2)
 Aggregate false, [ds#958], [SUM(PartialSum#963L) AS 
s1#952L,SUM(PartialSum#964L) AS s2#953L,ds#958 AS ds#954]
  Exchange (HashPartitioning [ds#958], 200)
   Aggregate true, [ds#958], [ds#958,SUM(CAST(key#956, LongType)) AS 
PartialSum#963L,SUM(CAST(key#959, LongType)) AS PartialSum#964L]
    Project [ds#958,key#956,key#959]
     ShuffledHashJoin [ds#955], [ds#958], BuildRight
      Exchange (HashPartitioning [ds#955], 200)
       HiveTableScan [key#956,ds#955], (MetastoreRelation dw, src_partitioned1, 
Some(a)), None
      Exchange (HashPartitioning [ds#958], 200)
       HiveTableScan [ds#958,key#959], (MetastoreRelation dw, src_partitioned2, 
Some(b)), None

> Filters not pushed with substitution through aggregation
> --------------------------------------------------------
>
>                 Key: SPARK-8287
>                 URL: https://issues.apache.org/jira/browse/SPARK-8287
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.3.1
>            Reporter: Li Sheng
>            Assignee: Li Sheng
>             Fix For: 1.6.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> Filter not push down through Subquery or View. Assume we have two big  
> partitioned table join inner a Subquery or a View and filter not push down, 
> this will cause a full partition join and will cause performance issues.
> Let me give and example that can reproduce the problem:
> {code:sql}
> create table src(key int, value string);
> -- Creates partitioned table and imports data
> CREATE TABLE src_partitioned1 (key int, value STRING) PARTITIONED BY (ds 
> STRING);
> insert overwrite table src_partitioned1 PARTITION (ds='1') select key, value 
> from src;
> insert overwrite table src_partitioned1 PARTITION (ds='2') select key, value 
> from src;
> CREATE TABLE src_partitioned2 (key int, value STRING) PARTITIONED BY (ds 
> STRING);
> insert overwrite table src_partitioned2 PARTITION (ds='1') select key, value 
> from src;
> insert overwrite table src_partitioned2 PARTITION (ds='2') select key, value 
> from src;
> -- Creates views
> create view src_view as select sum(a.key) s1, sum(b.key) s2, b.ds ds from 
> src_partitioned1 a join src_partitioned2 b on a.ds = b.ds group by b.ds
> create view src_view_1 as select sum(a.key) s1, sum(b.key) s2, b.ds my_ds 
> from src_partitioned1 a join src_partitioned2 b on a.ds = b.ds group by b.ds
> -- QueryExecution
> select * from dw.src_view where ds='2'
> {code}
> {noformat}
> sql("select * from dw.src_view where ds='2' ").queryExecution
> == Parsed Logical Plan ==
> 'Project [*]
>  'Filter ('ds = 2)
>   'UnresolvedRelation [dw,src_view], None
> == Analyzed Logical Plan ==
> Project [s1#60L,s2#61L,ds#62]
>  Filter (ds#62 = 2)
>   Subquery src_view
>    Aggregate [ds#66], [SUM(CAST(key#64, LongType)) AS s1#60L,SUM(CAST(key#67, 
> LongType)) AS s2#61L,ds#66 AS ds#62]
>     Join Inner, Some((ds#63 = ds#66))
>      MetastoreRelation dw, src_partitioned1, Some(a)
>      MetastoreRelation dw, src_partitioned2, Some(b)
> == Optimized Logical Plan ==
> Filter (ds#62 = 2)
>  Aggregate [ds#66], [SUM(CAST(key#64, LongType)) AS s1#60L,SUM(CAST(key#67, 
> LongType)) AS s2#61L,ds#66 AS ds#62]
>   Project [ds#66,key#64,key#67]
>    Join Inner, Some((ds#63 = ds#66))
>     Project [key#64,ds#63]
>      MetastoreRelation dw, s...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to