[ https://issues.apache.org/jira/browse/SPARK-4644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14937033#comment-14937033 ]
vidmantas zemleris edited comment on SPARK-4644 at 9/30/15 3:53 PM: -------------------------------------------------------------------- IMHO a good start could be implementing special case of left-join with NULL values in join conditions (and would be much simpler and more efficient than proposed implementation generic skew join [1]). currently we're only interested in this special case of NULL values, consider: {code:sql} SELECT * FROM t1 LEFT JOIN t2 ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal) {code} The problem is that if the join key contains a column with many NULL values (and it's the only join column, or arrity of other join columns is low), it'll get shuffled to one or few tasks - causing straggler tasks, or failing altogether. A naive solution could be along these lines: {code:scala} def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: Column) = { val rowsWithNulls = left.filter(joinConditions.containsNullValues) val safeRows = left .filter(!joinConditions.containsNullValues) .join(right, joinConditions) safeRows.unionAll( rowsWithNulls.addNullsForMissingColumns(safeRows.columns) ) } {code} likely this could be more efficient if implemented internally... What do you think guys? -- [1] https://github.com/tresata/spark-skewjoin > A skew join is just like a normal join except that keys with large amounts of > values are not processed by a single task but instead spread out across many > tasks. This is achieved by replicating key-value pairs for one side of the > join in such way that they go to multiple tasks... was (Author: vidma): IMHO a good start could be implementing special case of left-join with NULL values in join conditions (and would be much simpler and more efficient than proposed implementation generic skew join [1]). currently we're only interested in this special case of NULL values, consider: {code:sql} SELECT * FROM t1 LEFT JOIN t2 ON (t1.nullableCol = t2.nullableCol AND t1.portal = t2.portal) {code} The problem is that if the join key contains a column with many NULL values (and it's the only join column, or arrity of other join columns is low), it'll get shuffled to one or few tasks - causing straggler tasks, or failing altogether. A naive solution could be along these lines: ```scala def nullAwareLeftJoin(left: DataFrame, right: DataFrame, joinConditions: Column) = { val rowsWithNulls = left.filter(joinConditions.containsNullValues) val safeRows = left .filter(!joinConditions.containsNullValues) .join(right, joinConditions) safeRows.unionAll( rowsWithNulls.addNullsForMissingColumns(safeRows.columns) ) } ``` likely this could be more efficient if implemented internally... What do you think guys? -- [1] https://github.com/tresata/spark-skewjoin > A skew join is just like a normal join except that keys with large amounts of > values are not processed by a single task but instead spread out across many > tasks. This is achieved by replicating key-value pairs for one side of the > join in such way that they go to multiple tasks... > Implement skewed join > --------------------- > > Key: SPARK-4644 > URL: https://issues.apache.org/jira/browse/SPARK-4644 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Reporter: Shixiong Zhu > Attachments: Skewed Join Design Doc.pdf > > > Skewed data is not rare. For example, a book recommendation site may have > several books which are liked by most of the users. Running ALS on such > skewed data will raise a OutOfMemory error, if some book has too many users > which cannot be fit into memory. To solve it, we propose a skewed join > implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org