This is probably because your data size is well under the broadcastJoin 
threshold so at the planning phase it decides to do a BroadcastJoin instead of 
a Join which could take advantage of dynamic partition pruning. For testing 
like this you can always disable that with 
spark.sql.autoBroadcastJoinThreshold=-1

In a real data scenario the size of the join tables would probably be much 
larger than the default (10mb) and trigger a dynamic partition pruning although 
I can see it may be beneficial to implement dynamic partition pruning for 
broadcast joins as well...


> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami 
> <mohamadrezarosta...@gmail.com> wrote:
> 
> Hello all,
> 
> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
> format. To speed-up our querys, we trying diffrent scenarios. We find out 
> that Spark support dynamic partition pruning in versions after 3.0.0 . So, to 
> test the improvment of DPP feature we defined two tables sales and products 
> and a query. You can find the codes that initialize the envieonment here:
> # First Run
> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), 
> (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
> val productSeq = Seq((1, "A"), (2, "B"))
> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet
>  <hdfs://test/spark-optimization/sales.parquet>")
> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet
>  <hdfs://test/spark-optimization/products.parquet>”)
> 
> Then we run an other scala code to run the query. you can find the second run 
> file here:
> # Second Run
> val salesDF = 
> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
> <hdfs://test/spark-optimization/sales.parquet>")
> val productDF = 
> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
> <hdfs://test/spark-optimization/sales.parquet>")
> salesDF.createOrReplaceTempView("sales")
> productDF.createOrReplaceTempView("products")
> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id 
> <http://products.id/> and products.name = 'A'").explain()
> 
> Based on the DPP feature, we expect filters pushed down to file scan level 
> that prevents reading unnecessary partitions. See the following picture:
> <13551396-1591032620843.png>
> 
> 
> But instead, Spark does not push down filters to the file scan layer and uses 
> broadcast join without filtering partitions. See the following picture:
> <13551394-1591032607773.png>
> 
> 
> To better understand the situation, please have looked at this link. 
> (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 
> <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
> We checked the DPP and adaptive query features are enabled in our spark 
> cluster. So my question is, How can I debug and find the root cause of this 
> problem?
> 
> 
> Cheers,

Reply via email to