Hi,

I have two tables t1 and t2. Both are bucketed and sorted on user_id into
32 buckets.

When I use a regular equal join, Spark triggers the expected Sorted Merge
Bucket Join. Please see my code and the physical plan below.

from pyspark.sql import SparkSession


def _gen_spark_session(job_name: str) -> SparkSession:
return (
SparkSession
.builder
.enableHiveSupport()
.appName(f'Job: {job_name}')
.config(
key='spark.sql.sources.bucketing.enabled',
value='true',
).config(
key='spark.sql.legacy.bucketedTableScan.outputOrdering',
value='true',
).config(
key='spark.hadoop.mapreduce.fileoutputcommitter'
'.algorithm.version',
value='2',
).config(
key='spark.speculation',
value='false',
).getOrCreate()
)


def run() -> None:
spark = _gen_spark_session(job_name='TEST')
joined_df = spark.sql(f'''
SELECT
COALESCE(t1.user_id, t2.user_id) AS user_id
FROM
t1 FULL OUTER JOIN t2
ON t1.user_id = t2.user_id
''')
joined_df.explain(True)


if __name__ == '__main__':
run()


Physical Plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L]
   +- SortMergeJoin [user_id#0L], [user_id#6L], FullOuter
      :- FileScan parquet Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<user_id:bigint>,
SelectedBucketsCount: 32 out of 32
      +- FileScan parquet Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<user_id:bigint>,
SelectedBucketsCount: 32 out of 32

As you can see, there is no exchange and sort before the SorteMergeJoin
step.

However, if I switch to using eqNullSafe as the join condition, Spark
doesn't trigger the Sorted Merge Bucket Join any more.
def run() -> None:
spark = _gen_spark_session(job_name='TEST')
joined_df = spark.sql(f'''
SELECT
COALESCE(t1.user_id, t2.user_id) AS user_id
FROM
t1 FULL OUTER JOIN t2
ON t1.user_id <=> t2.user_id
''')
joined_df.explain(True)


The equal sign is the only thing I changed between the two runs.

Physical Plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [coalesce(user_id#0L, user_id#6L) AS user_id#12L]
   +- SortMergeJoin [coalesce(user_id#0L, 0), isnull(user_id#0L)],
[coalesce(user_id#6L, 0), isnull(user_id#6L)], FullOuter
      :- Sort [coalesce(user_id#0L, 0) ASC NULLS FIRST,
isnull(user_id#0L) ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(coalesce(user_id#0L, 0),
isnull(user_id#0L), 1000), ENSURE_REQUIREMENTS, [id=#23]
      :     +- FileScan parquet [user_id#0L] Batched: true,
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<user_id:bigint>
      +- Sort [coalesce(user_id#6L, 0) ASC NULLS FIRST,
isnull(user_id#6L) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(coalesce(user_id#6L, 0),
isnull(user_id#6L), 1000), ENSURE_REQUIREMENTS, [id=#26]
            +- FileScan parquet [user_id#6L] Batched: true,
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<user_id:bigint>


If I read this correctly, the eqNullSafe is just a syntactic sugar that
automatically applies a COALESCE to 0? Does Spark consider potential key
collisions in this case (e.g. I have a user_id = 0 in my original dataset)?

I know if we apply a UDF on the join condition, it would break the
bucketing, thus the rebucketing and resorting. However, I'm wondering in
this special case, can we make it work as well? Thanks.

Thomas

Reply via email to