[ https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Juhong Jung updated SPARK-22923: -------------------------------- Description: I found this issue during self join for retrieving same key last record problem. (similar with https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group) Currently, SortMergeJoinExec is chosen only if join expression include equality expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See SparkStrategies.scala apply method). When join with non-equi condition only expression, that expression is not matched with ExtractEquiJoinKeys and go to last case, so BroadcastNestedLoopJoinExec is chosen even if data size is larger than spark.sql.autoBroadcastJoinThreshold. For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to driver to broadcast. Now the job is aborted because of spark.driver.maxResultSize option or driver container is dead because of OutOfMemory. I think sort merge join is good join strategy for non-equi join, so maybe modifying pattern matcher is one of way to being spark planner chose sort merge join for non-equi join. And, I have just added trivial equal condition to join expression for using sort merge join. Below code is sample. {code:java} val data = (1 to 10000). map(t => (t, scala.util.Random.nextInt(10000))). toDF("id", "number"). dropDuplicates("number"). as("data") val laterData = data. as("laterData"). select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*) val latestData = data. join( laterData, 'number < 'later_number, "leftouter" ). filter('later_id.isNull) latestData.explain {code} was: I found this issue during self join for retrieving same key last record problem. (similar with https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group) Currently, SortMergeJoinExec is chosen only if join expression include equality expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See SparkStrategies.scala apply method). When join with non-equi condition only expression, that expression is not matched with ExtractEquiJoinKeys and go to last case, so BroadcastNestedLoopJoinExec is chosen even if data size is larger than spark.sql.autoBroadcastJoinThreshold. For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 10MB, BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to driver to broadcast. Then, job will be aborted cause spark.driver.maxResultSize or got OutOfMemory and dead. I think sort merge join is good join strategy for non-equi join, so maybe modifying pattern matcher is one of way to being spark planner chose sort merge join for non-equi join. And, I have just added trivial equal condition to join expression for using sort merge join. Below code is sample. {code:java} val data = (1 to 10000). map(t => (t, scala.util.Random.nextInt(10000))). toDF("id", "number"). dropDuplicates("number"). as("data") val laterData = data. as("laterData"). select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*) val latestData = data. join( laterData, 'number < 'later_number, "leftouter" ). filter('later_id.isNull) latestData.explain {code} > Non-equi join(theta join) should use sort merge join > ---------------------------------------------------- > > Key: SPARK-22923 > URL: https://issues.apache.org/jira/browse/SPARK-22923 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.2.1 > Reporter: Juhong Jung > > I found this issue during self join for retrieving same key last record > problem. (similar with > https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group) > Currently, SortMergeJoinExec is chosen only if join expression include > equality expression cause SparkStrategies pattern matcher use > ExtractEquiJoinKeys. (See SparkStrategies.scala apply method). > When join with non-equi condition only expression, that expression is not > matched with ExtractEquiJoinKeys and go to last case, so > BroadcastNestedLoopJoinExec is chosen even if data size is larger than > spark.sql.autoBroadcastJoinThreshold. > For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold > is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe > is sent to driver to broadcast. > Now the job is aborted because of spark.driver.maxResultSize option or driver > container is dead because of OutOfMemory. > I think sort merge join is good join strategy for non-equi join, so maybe > modifying pattern matcher is one of way to being spark planner chose sort > merge join for non-equi join. > And, I have just added trivial equal condition to join expression for using > sort merge join. > Below code is sample. > {code:java} > val data = (1 to 10000). > map(t => (t, scala.util.Random.nextInt(10000))). > toDF("id", "number"). > dropDuplicates("number"). > as("data") > val laterData = data. > as("laterData"). > select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*) > val latestData = data. > join( > laterData, > 'number < 'later_number, > "leftouter" > ). > filter('later_id.isNull) > latestData.explain > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org