[ 
https://issues.apache.org/jira/browse/SPARK-4644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14937033#comment-14937033
 ] 

vidmantas zemleris edited comment on SPARK-4644 at 9/30/15 3:53 PM:
--------------------------------------------------------------------

IMHO a good start could be implementing special case of left-join with NULL 
values in join conditions (and would be much simpler and more efficient than 
proposed implementation generic skew join [1]). 

currently we're only interested in this special case of NULL values, consider:

{code:sql}
SELECT * FROM t1
LEFT JOIN t2 
ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal)
{code}

The problem is that if the join key contains a column with many NULL values 
(and it's the only join column, or arrity of other join columns is low), it'll 
get shuffled to one or few tasks - causing straggler tasks, or failing 
altogether.

A naive solution could be along these lines:

{code:scala}
def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: 
Column) = {
  val rowsWithNulls = left.filter(joinConditions.containsNullValues)
  val safeRows = left
     .filter(!joinConditions.containsNullValues)
     .join(right, joinConditions)

  safeRows.unionAll(
    rowsWithNulls.addNullsForMissingColumns(safeRows.columns)
  )
}
{code}

likely this could be more efficient if implemented internally...


What do you think guys?
--

[1] https://github.com/tresata/spark-skewjoin 

> A skew join is just like a normal join except that keys with large amounts of 
> values are not processed by a single task but instead spread out across many 
> tasks. This is achieved by replicating key-value pairs for one side of the 
> join in such way that they go to multiple tasks...


was (Author: vidma):
IMHO a good start could be implementing special case of left-join with NULL 
values in join conditions (and would be much simpler and more efficient than 
proposed implementation generic skew join [1]). 

currently we're only interested in this special case of NULL values, consider:

{code:sql}
SELECT * FROM t1
LEFT JOIN t2 
ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal)
{code}

The problem is that if the join key contains a column with many NULL values 
(and it's the only join column, or arrity of other join columns is low), it'll 
get shuffled to one or few tasks - causing straggler tasks, or failing 
altogether.

A naive solution could be along these lines:

```scala
def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: 
Column) = {
  val rowsWithNulls = left.filter(joinConditions.containsNullValues)
  val safeRows = left
     .filter(!joinConditions.containsNullValues)
     .join(right, joinConditions)

  safeRows.unionAll(
    rowsWithNulls.addNullsForMissingColumns(safeRows.columns)
  )
}
```

likely this could be more efficient if implemented internally...


What do you think guys?
--

[1] https://github.com/tresata/spark-skewjoin 

> A skew join is just like a normal join except that keys with large amounts of 
> values are not processed by a single task but instead spread out across many 
> tasks. This is achieved by replicating key-value pairs for one side of the 
> join in such way that they go to multiple tasks...

> Implement skewed join
> ---------------------
>
>                 Key: SPARK-4644
>                 URL: https://issues.apache.org/jira/browse/SPARK-4644
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Shixiong Zhu
>         Attachments: Skewed Join Design Doc.pdf
>
>
> Skewed data is not rare. For example, a book recommendation site may have 
> several books which are liked by most of the users. Running ALS on such 
> skewed data will raise a OutOfMemory error, if some book has too many users 
> which cannot be fit into memory. To solve it, we propose a skewed join 
> implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to