Hi all,
   I've been hitting this issue, and hoping to get some traction going at:
https://issues.apache.org/jira/browse/SPARK-21492
and PR: https://github.com/apache/spark/pull/23762

If SortMergeJoinScanner doesn't consume the iterator from
UnsafeExternalRowSorter entirely, the memory that
UnsafeExternalSorter acquired from TaskMemoryManager will not
be released. This leads to a memory leak, spills, and OOME. A
page will be held per partition of the unused iterator.

For example, this will fail on 3.0-snapshot
./bin/pyspark --master local[4]

from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()


Cheers,

Tao

Reply via email to