[ 
https://issues.apache.org/jira/browse/SPARK-19730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shawn Lavelle updated SPARK-19730:
----------------------------------
    Description: 
When a SparkSQL query contains a subquery in the where clause, such as a 
predicate query using the IN operator, the results of that subquery are not 
pushed down as a fileter to the DataSourceAPI for the outer query. 

Example: 
Select point, time, value from data where time between now()-86400 and now() 
and point in (select point from groups where group_id=5);

Two queries will be sent to the data Source.  One for the subquery, and another 
for the outer query. The subquery works correctly returning the points in the 
group, however, outer query does not push a filter for point column.

Affect:
The "group" table has a few hundred rows to group a few hundred thousand 
points.  The data table has several billion rows keyed by point and time.  
Without the ability to push down the filters for the columns of outer the 
query, the data source cannot properly conduct its pruned scan.

The subquery results should be pushed down to the outer query as an IN Filter 
with the results of the subquery.

{panel:title=Parsed Logical Plan}
Project ['cpid, 'value]
+- 'Filter ('cpid IN (list#280) && (('system_timestamp_ms >= ((cast('now() as 
bigint) * 1000) - 5000)) && ('system_timestamp_ms <= (cast('now() as bigint) * 
1000))))
   :  +- 'Project [unresolvedalias('explode('cpids), None)]
   :     +- 'Filter ('station_id = 1)
   :        +- 'UnresolvedRelation `stations`
   +- 'UnresolvedRelation `timeseries`
{panel}

== Analyzed Logical Plan ==
cpid: int, value: double
Project [cpid#265, value#272]
+- Filter (predicate-subquery#280 [(cpid#265 = col#283)] && 
((system_timestamp_ms#266L >= ((cast(current_timestamp() as bigint) * cast(1000 
as bigint)) - cast(5000 as bigint))) && (system_timestamp_ms#266L <= 
(cast(current_timestamp() as bigint) * cast(1000 as bigint)))))
   :  +- Project [col#283]
   :     +- Generate explode(cpids#275), false, false, [col#283]
   :        +- Filter (station_id#274 = 1)
   :           +- SubqueryAlias stations
   :              +- Relation[station#273,station_id#274,cpids#275] 
com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b
   +- SubqueryAlias timeseries
      +- 
Relation[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
 com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0

== Optimized Logical Plan ==
Project [cpid#265, value#272]
+- Join LeftSemi, (cpid#265 = col#283)
   :- Filter ((system_timestamp_ms#266L >= 1487959796000) && 
(system_timestamp_ms#266L <= 1487959801000))
   :  +- 
Relation[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
 com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0
   +- Generate explode(cpids#275), false, false, [col#283]
      +- Project [cpids#275]
         +- Filter (station_id#274 = 1)
            +- Relation[station#273,station_id#274,cpids#275] 
com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b

== Physical Plan ==
*Project [cpid#265, value#272]
+- SortMergeJoin [cpid#265], [col#283], LeftSemi
   :- *Sort [cpid#265 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cpid#265, 20)
   :     +- *Filter ((system_timestamp_ms#266L >= 1487959796000) && 
(system_timestamp_ms#266L <= 1487959801000))
   :        +- *Scan 
com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0 
chronus_hsh_20xx.timeseries[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
 PushedFilters: [GreaterThanOrEqual(system_timestamp_ms,1487959796000), 
LessThanOrEqual(system_timestamp_ms,14879..., ReadSchema: 
struct<bucket:bigint,split_factor:int,cpid:int,system_timestamp_ms:bigint,source_timestamp_ms:big...
   +- *Sort [col#283 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col#283, 20)
         +- Generate explode(cpids#275), false, false, [col#283]
            +- *Project [cpids#275]
               +- *Filter (station_id#274 = 1)
                  +- *Scan 
com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b 
chronus_hsh_20xx.stations[cpids#275,station_id#274] PushedFilters: 
[EqualTo(station_id,1)], ReadSchema: struct<cpids:array<int>>  |

  was:
When a SparkSQL query contains a subquery in the where clause, such as a 
predicate query using the IN operator, the results of that subquery are not 
pushed down as a fileter to the DataSourceAPI for the outer query. 

Example: 
Select point, time, value from data where time between now()-86400 and now() 
and point in (select point from groups where group_id=5);

Two queries will be sent to the data Source.  One for the subquery, and another 
for the outer query. The subquery works correctly returning the points in the 
group, however, outer query does not push a filter for point column.

Affect:
The "group" table has a few hundred rows to group a few hundred thousand 
points.  The data table has several billion rows keyed by point and time.  
Without the ability to push down the filters for the columns of outer the 
query, the data source cannot properly conduct its pruned scan.

The subquery results should be pushed down to the outer query as an IN Filter 
with the results of the subquery.


> Predicate Subqueries do not push results of subqueries to data source
> ---------------------------------------------------------------------
>
>                 Key: SPARK-19730
>                 URL: https://issues.apache.org/jira/browse/SPARK-19730
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 2.1.0
>            Reporter: Shawn Lavelle
>
> When a SparkSQL query contains a subquery in the where clause, such as a 
> predicate query using the IN operator, the results of that subquery are not 
> pushed down as a fileter to the DataSourceAPI for the outer query. 
> Example: 
> Select point, time, value from data where time between now()-86400 and now() 
> and point in (select point from groups where group_id=5);
> Two queries will be sent to the data Source.  One for the subquery, and 
> another for the outer query. The subquery works correctly returning the 
> points in the group, however, outer query does not push a filter for point 
> column.
> Affect:
> The "group" table has a few hundred rows to group a few hundred thousand 
> points.  The data table has several billion rows keyed by point and time.  
> Without the ability to push down the filters for the columns of outer the 
> query, the data source cannot properly conduct its pruned scan.
> The subquery results should be pushed down to the outer query as an IN Filter 
> with the results of the subquery.
> {panel:title=Parsed Logical Plan}
> Project ['cpid, 'value]
> +- 'Filter ('cpid IN (list#280) && (('system_timestamp_ms >= ((cast('now() as 
> bigint) * 1000) - 5000)) && ('system_timestamp_ms <= (cast('now() as bigint) 
> * 1000))))
>    :  +- 'Project [unresolvedalias('explode('cpids), None)]
>    :     +- 'Filter ('station_id = 1)
>    :        +- 'UnresolvedRelation `stations`
>    +- 'UnresolvedRelation `timeseries`
> {panel}
> == Analyzed Logical Plan ==
> cpid: int, value: double
> Project [cpid#265, value#272]
> +- Filter (predicate-subquery#280 [(cpid#265 = col#283)] && 
> ((system_timestamp_ms#266L >= ((cast(current_timestamp() as bigint) * 
> cast(1000 as bigint)) - cast(5000 as bigint))) && (system_timestamp_ms#266L 
> <= (cast(current_timestamp() as bigint) * cast(1000 as bigint)))))
>    :  +- Project [col#283]
>    :     +- Generate explode(cpids#275), false, false, [col#283]
>    :        +- Filter (station_id#274 = 1)
>    :           +- SubqueryAlias stations
>    :              +- Relation[station#273,station_id#274,cpids#275] 
> com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b
>    +- SubqueryAlias timeseries
>       +- 
> Relation[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
>  com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0
> == Optimized Logical Plan ==
> Project [cpid#265, value#272]
> +- Join LeftSemi, (cpid#265 = col#283)
>    :- Filter ((system_timestamp_ms#266L >= 1487959796000) && 
> (system_timestamp_ms#266L <= 1487959801000))
>    :  +- 
> Relation[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
>  com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0
>    +- Generate explode(cpids#275), false, false, [col#283]
>       +- Project [cpids#275]
>          +- Filter (station_id#274 = 1)
>             +- Relation[station#273,station_id#274,cpids#275] 
> com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b
> == Physical Plan ==
> *Project [cpid#265, value#272]
> +- SortMergeJoin [cpid#265], [col#283], LeftSemi
>    :- *Sort [cpid#265 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(cpid#265, 20)
>    :     +- *Filter ((system_timestamp_ms#266L >= 1487959796000) && 
> (system_timestamp_ms#266L <= 1487959801000))
>    :        +- *Scan 
> com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@1bb602a0 
> chronus_hsh_20xx.timeseries[bucket#263L,split_factor#264,cpid#265,system_timestamp_ms#266L,source_timestamp_ms#267L,chronus_quality#268,tag#269,limits#270,quality#271,value#272]
>  PushedFilters: [GreaterThanOrEqual(system_timestamp_ms,1487959796000), 
> LessThanOrEqual(system_timestamp_ms,14879..., ReadSchema: 
> struct<bucket:bigint,split_factor:int,cpid:int,system_timestamp_ms:bigint,source_timestamp_ms:big...
>    +- *Sort [col#283 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col#283, 20)
>          +- Generate explode(cpids#275), false, false, [col#283]
>             +- *Project [cpids#275]
>                +- *Filter (station_id#274 = 1)
>                   +- *Scan 
> com.osii.hsh.analytics.spark.cassandra.HSHCassandraConnector@69a156b 
> chronus_hsh_20xx.stations[cpids#275,station_id#274] PushedFilters: 
> [EqualTo(station_id,1)], ReadSchema: struct<cpids:array<int>>  |



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to