This is probably because your data size is well under the broadcastJoin threshold so at the planning phase it decides to do a BroadcastJoin instead of a Join which could take advantage of dynamic partition pruning. For testing like this you can always disable that with spark.sql.autoBroadcastJoinThreshold=-1
In a real data scenario the size of the join tables would probably be much larger than the default (10mb) and trigger a dynamic partition pruning although I can see it may be beneficial to implement dynamic partition pruning for broadcast joins as well... > On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami > <mohamadrezarosta...@gmail.com> wrote: > > Hello all, > > We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet > format. To speed-up our querys, we trying diffrent scenarios. We find out > that Spark support dynamic partition pruning in versions after 3.0.0 . So, to > test the improvment of DPP feature we defined two tables sales and products > and a query. You can find the codes that initialize the envieonment here: > # First Run > val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), > (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8)) > val productSeq = Seq((1, "A"), (2, "B")) > sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet > <hdfs://test/spark-optimization/sales.parquet>") > sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet > <hdfs://test/spark-optimization/products.parquet>”) > > Then we run an other scala code to run the query. you can find the second run > file here: > # Second Run > val salesDF = > spark.read.parquet("hdfs://test/spark-optimization/sales.parquet > <hdfs://test/spark-optimization/sales.parquet>") > val productDF = > spark.read.parquet("hdfs://test/spark-optimization/sales.parquet > <hdfs://test/spark-optimization/sales.parquet>") > salesDF.createOrReplaceTempView("sales") > productDF.createOrReplaceTempView("products") > sql("SELECT * FROM sales JOIN products ON sales.pId = products.id > <http://products.id/> and products.name = 'A'").explain() > > Based on the DPP feature, we expect filters pushed down to file scan level > that prevents reading unnecessary partitions. See the following picture: > <13551396-1591032620843.png> > > > But instead, Spark does not push down filters to the file scan layer and uses > broadcast join without filtering partitions. See the following picture: > <13551394-1591032607773.png> > > > To better understand the situation, please have looked at this link. > (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 > <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>) > We checked the DPP and adaptive query features are enabled in our spark > cluster. So my question is, How can I debug and find the root cause of this > problem? > > > Cheers,