[ 
https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020287#comment-16020287
 ] 

Jayesh lalwani commented on SPARK-19609:
----------------------------------------

Just like to point out that this would help applications that are using the 
Structured Streaming capabilities of Spark immensely, especially when you are 
talking about implementing applications that [~matei] refers to as "Continuous 
applications" 

One of the selling points of Structured Streaming is that you can join 
streaming data with data from batch sources, like databases. In these cases, it 
is frequently required that streaming data be joined with database tables that 
contain millions(or even billions) of rows. This meets the exact profile of 
data that this ticket describes: A smaller set of data being joined with a 
larger set

The problem is that without this change, there is no good way of joining a 
streaming data frame with a large database table. You can either 
a) use spark.read.jdbc to read the huge table into a data frame, and then join 
it with a streaming data frame 
OR
b) call df.map/df.mapWithPartition on your streaming data frame, and run a 
database query yourself

The problem right now with a) is that the huge table is read at every execution 
of a micro batch. In fact, if your spark application has multiple sinks, it 
will run it once for every sink, because Spark doesn't have a good mechanism 
for specifying data frames that need to be cached for the micro batch. The 
problem with b) is that it adds complexity into the application level code.  
And you are really leaving the Dataframe abstraction when you are calling the 
map function. For example, what if your SQL query was getting 1000x records. 
Suddenly, you have millions of records being created in each partition, and you 
have to tell Spark to repartition the output, which incurs cost.

Ideally, we would like to use spark.read.jdbc followed by a join with the 
streaming dataframe, and have Spark push down the join predicates to the 
database. This will result in less data being pulled into Spark in every micro 
batch.

> Broadcast joins should pushdown join constraints as Filter to the larger 
> relation
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-19609
>                 URL: https://issues.apache.org/jira/browse/SPARK-19609
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Nick Dimiduk
>
> For broadcast inner-joins, where the smaller relation is known to be small 
> enough to materialize on a worker, the set of values for all join columns is 
> known and fits in memory. Spark should translate these values into a 
> {{Filter}} pushed down to the datasource. The common join condition of 
> equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. 
> An example of pushing such filters is already present in the form of 
> {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks.
> This optimization could even work when the smaller relation does not fit 
> entirely in memory. This could be done by partitioning the smaller relation 
> into N pieces, applying this predicate pushdown for each piece, and unioning 
> the results.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to