I am reading two datasets that I saved to the disk with ```bucketBy```
option on the same key with the same number of partitions. When I read them
back and join them, they should not result in a shuffle.

But, that isn't the case I am seeing.

*The following code demonstrates the alleged behavior:*

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold",
"-1").getOrCreate()
import random

data1 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for
i in range(5)]
data2 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for
i in range(5)]
df1=spark.createDataFrame(data1,schema = 'a int,b int,c int')
df2=spark.createDataFrame(data1,schema = 'a int,b int,c int')

parquet_path1 = './bucket_test_parquet1'
parquet_path2 = './bucket_test_parquet2'

df1.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path1,mode='overwrite')
df2.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path2,mode='overwrite')

read_parquet1 = spark.read.format("parquet").load(parquet_path1,header=True)
read_parquet1.createOrReplaceTempView("read_parquet1")
read_parquet1.createOrReplaceTempView('read_parquet1')
read_parquet1 = spark.sql("SELECT * from read_parquet1")

read_parquet2 = spark.read.format("parquet").load(parquet_path2,header=True)
read_parquet2.createOrReplaceTempView("read_parquet2")
read_parquet2.createOrReplaceTempView('read_parquet2')
read_parquet2 = spark.sql("SELECT * from read_parquet2")
read_parquet1.join(read_parquet2,on='a').explain()

*The output that I am getting is*

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#24, b#25, c#26, b#34, c#35]
   +- SortMergeJoin [a#24], [a#33], Inner
      :- Sort [a#24 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(a#24, 200), ENSURE_REQUIREMENTS, [id=#61]
      :     +- Filter isnotnull(a#24)
      :        +- FileScan parquet [a#24,b#25,c#26] Batched: true,
DataFilters: [isnotnull(a#24)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/home/nitin/pymonsoon/bucket_test_parquet1],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:int,c:int>
      +- Sort [a#33 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(a#33, 200), ENSURE_REQUIREMENTS, [id=#62]
            +- Filter isnotnull(a#33)
               +- FileScan parquet [a#33,b#34,c#35] Batched: true,
DataFilters: [isnotnull(a#33)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/home/nitin/pymonsoon/bucket_test_parquet2],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:int,c:int>


*Which clearly has hashpartitioning goiong on. Kindly, help me clarify
the utility of ```bucketBy```*

Reply via email to