Hi, I have two tables t1 and t2. Both are bucketed and sorted on user_id into 32 buckets.
When I use a regular equal join, Spark triggers the expected Sorted Merge Bucket Join. Please see my code and the physical plan below. from pyspark.sql import SparkSession def _gen_spark_session(job_name: str) -> SparkSession: return ( SparkSession .builder .enableHiveSupport() .appName(f'Job: {job_name}') .config( key='spark.sql.sources.bucketing.enabled', value='true', ).config( key='spark.sql.legacy.bucketedTableScan.outputOrdering', value='true', ).config( key='spark.hadoop.mapreduce.fileoutputcommitter' '.algorithm.version', value='2', ).config( key='spark.speculation', value='false', ).getOrCreate() ) def run() -> None: spark = _gen_spark_session(job_name='TEST') joined_df = spark.sql(f''' SELECT COALESCE(t1.user_id, t2.user_id) AS user_id FROM t1 FULL OUTER JOIN t2 ON t1.user_id = t2.user_id ''') joined_df.explain(True) if __name__ == '__main__': run() Physical Plan: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L] +- SortMergeJoin [user_id#0L], [user_id#6L], FullOuter :- FileScan parquet Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 32 out of 32 +- FileScan parquet Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint>, SelectedBucketsCount: 32 out of 32 As you can see, there is no exchange and sort before the SorteMergeJoin step. However, if I switch to using eqNullSafe as the join condition, Spark doesn't trigger the Sorted Merge Bucket Join any more. def run() -> None: spark = _gen_spark_session(job_name='TEST') joined_df = spark.sql(f''' SELECT COALESCE(t1.user_id, t2.user_id) AS user_id FROM t1 FULL OUTER JOIN t2 ON t1.user_id <=> t2.user_id ''') joined_df.explain(True) The equal sign is the only thing I changed between the two runs. Physical Plan: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L] +- SortMergeJoin [coalesce(user_id#0L, 0), isnull(user_id#0L)], [coalesce(user_id#6L, 0), isnull(user_id#6L)], FullOuter :- Sort [coalesce(user_id#0L, 0) ASC NULLS FIRST, isnull(user_id#0L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(user_id#0L, 0), isnull(user_id#0L), 1000), ENSURE_REQUIREMENTS, [id=#23] : +- FileScan parquet [user_id#0L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint> +- Sort [coalesce(user_id#6L, 0) ASC NULLS FIRST, isnull(user_id#6L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(user_id#6L, 0), isnull(user_id#6L), 1000), ENSURE_REQUIREMENTS, [id=#26] +- FileScan parquet [user_id#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:bigint> If I read this correctly, the eqNullSafe is just a syntactic sugar that automatically applies a COALESCE to 0? Does Spark consider potential key collisions in this case (e.g. I have a user_id = 0 in my original dataset)? I know if we apply a UDF on the join condition, it would break the bucketing, thus the rebucketing and resorting. However, I'm wondering in this special case, can we make it work as well? Thanks. Thomas