[ 
https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juhong Jung updated SPARK-22923:
--------------------------------
    Description: 
I found this issue during self join for retrieving same key last record 
problem. (similar with 
https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)

Currently, SortMergeJoinExec is chosen only if join expression include equality 
expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See 
SparkStrategies.scala apply method).
When join with non-equi condition only expression, that expression is not 
matched with ExtractEquiJoinKeys and go to last case, so 
BroadcastNestedLoopJoinExec is chosen event if data size is larger than 
spark.sql.autoBroadcastJoinThreshold.
For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 
10MB, BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to 
driver to broadcast.
Then, job will be aborted cause spark.driver.maxResultSize or got OutOfMemory 
and dead.

I think sort merge join is good join strategy for non-equi join, so maybe 
modifying pattern matcher is one of way to being spark planner chose sort merge 
join for non-equi join.
And, I have just added trivial equal condition to join expression for using 
sort merge join.


Below code is sample.

{code:java}
val data = (1 to 10000).
  map(t => (t, scala.util.Random.nextInt(10000))).
  toDF("id", "number").
  dropDuplicates("number").
  as("data")
val laterData = data.
  as("laterData").
  select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
val latestData = data.
  join(
    laterData,
    'number < 'later_number,
    "leftouter"
  ).
  filter('later_id.isNull)

latestData.explain
{code}



  was:
I found this issue during self join for retrieving same key last record 
problem. (similar with 
https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)

Currently, SortMergeJoinExec is chosen only if join expression include equality 
expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See 
SparkStrategies.scala apply method).
When join with non-equi condition only expression, that expression is not 
matched with ExtractEquiJoinKeys and go to last case, so 
BroadcastNestedLoopJoinExec is chosen event if data size is larger than 
spark.sql.autoBroadcastJoinThreshold.
For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 
10MB, BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to 
driver to broadcast.
Then, job will be aborted cause spark.driver.maxResultSize or got OutOfMemory 
and dead.

I think sort merge join is good join strategy for non-equi join, so maybe 
modifying pattern matcher is one of way to being spark planner chose sort merge 
join for non-equi join.
And, I have just added trivial equal condition to join expression for using 
sort merge join.


> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
>                 Key: SPARK-22923
>                 URL: https://issues.apache.org/jira/browse/SPARK-22923
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: Juhong Jung
>
> I found this issue during self join for retrieving same key last record 
> problem. (similar with 
> https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include 
> equality expression cause SparkStrategies pattern matcher use 
> ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not 
> matched with ExtractEquiJoinKeys and go to last case, so 
> BroadcastNestedLoopJoinExec is chosen event if data size is larger than 
> spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold 
> is 10MB, BroadcastNestedLoopJoinExec is chosen and large size dataframe is 
> sent to driver to broadcast.
> Then, job will be aborted cause spark.driver.maxResultSize or got OutOfMemory 
> and dead.
> I think sort merge join is good join strategy for non-equi join, so maybe 
> modifying pattern matcher is one of way to being spark planner chose sort 
> merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using 
> sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
>   map(t => (t, scala.util.Random.nextInt(10000))).
>   toDF("id", "number").
>   dropDuplicates("number").
>   as("data")
> val laterData = data.
>   as("laterData").
>   select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
>   join(
>     laterData,
>     'number < 'later_number,
>     "leftouter"
>   ).
>   filter('later_id.isNull)
> latestData.explain
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to