[ https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327881#comment-16327881 ]
Herman van Hovell commented on SPARK-22923: ------------------------------------------- You cannot use a shuffling join for such problems. You will need all data for one of the join sides on every node. Using sort for speeding up join will work when the join predicate contains a greater/smaller than operator. However this is not a SMJ. > Non-equi join(theta join) should use sort merge join > ---------------------------------------------------- > > Key: SPARK-22923 > URL: https://issues.apache.org/jira/browse/SPARK-22923 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.2.1 > Reporter: Juhong Jung > Priority: Major > > I found this issue during self join for retrieving same key last record > problem. (similar with > https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group) > Currently, SortMergeJoinExec is chosen only if join expression include > equality expression cause SparkStrategies pattern matcher use > ExtractEquiJoinKeys. (See SparkStrategies.scala apply method). > When join with non-equi condition only expression, that expression is not > matched with ExtractEquiJoinKeys and go to last case, so > BroadcastNestedLoopJoinExec is chosen even if data size is larger than > spark.sql.autoBroadcastJoinThreshold. > For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold > is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe > is sent to driver to broadcast. > Now the job is aborted because of spark.driver.maxResultSize option or driver > container is dead because of OutOfMemory. > I think sort merge join is good join strategy for non-equi join, so maybe > modifying pattern matcher is one of way to being spark planner chose sort > merge join for non-equi join. > And, I have just added trivial equal condition to join expression for using > sort merge join. > Below code is sample. > {code:java} > val data = (1 to 10000). > map(t => (t, scala.util.Random.nextInt(10000))). > toDF("id", "number"). > dropDuplicates("number"). > as("data") > val laterData = data. > as("laterData"). > select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*) > val latestData = data. > join( > laterData, > 'number < 'later_number, > "leftouter" > ). > filter('later_id.isNull) > latestData.explain > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org