Hi all, I've been hitting this issue, and hoping to get some traction going at: https://issues.apache.org/jira/browse/SPARK-21492 and PR: https://github.com/apache/spark/pull/23762
If SortMergeJoinScanner doesn't consume the iterator from UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will not be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator. For example, this will fail on 3.0-snapshot ./bin/pyspark --master local[4] from pyspark.sql.functions import rand, col spark.conf.set("spark.sql.join.preferSortMergeJoin", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) r1 = spark.range(1, 1001).select(col("id").alias("timestamp1")) r1 = r1.withColumn('value', rand()) r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2")) r2 = r2.withColumn('value2', rand()) joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner") joined = joined.coalesce(1) joined.explain() joined.show() Cheers, Tao