Thank you for your response. It was a good point "under the broadcast join 
threshold." We test it on real data sets with tables size TBs, but instead, 
Spark uses merge sort join without DPP. Anyway, you said that the DPP is not 
implemented for broadcast joins? So, I wonder how DPP can be beneficial without 
broadcasting index tables(the table containing dynamic filters.)? Because the 
implementation that I have in my mind from DPP is something like:
1. Spark read the dynamic filters from index tables.
2. Broadcast dynamic filters on all nodes
3. executers push down dynamic filter to file scan layer and run the query

Thank you so much for your attention and participation.

> On Dec 4, 2021, at 20:44, Russell Spitzer <russell.spit...@gmail.com> wrote:
> 
> This is probably because your data size is well under the broadcastJoin 
> threshold so at the planning phase it decides to do a BroadcastJoin instead 
> of a Join which could take advantage of dynamic partition pruning. For 
> testing like this you can always disable that with 
> spark.sql.autoBroadcastJoinThreshold=-1
> 
> In a real data scenario the size of the join tables would probably be much 
> larger than the default (10mb) and trigger a dynamic partition pruning 
> although I can see it may be beneficial to implement dynamic partition 
> pruning for broadcast joins as well...
> 
> 
>> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami 
>> <mohamadrezarosta...@gmail.com <mailto:mohamadrezarosta...@gmail.com>> wrote:
>> 
>> Hello all,
>> 
>> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
>> format. To speed-up our querys, we trying diffrent scenarios. We find out 
>> that Spark support dynamic partition pruning in versions after 3.0.0 . So, 
>> to test the improvment of DPP feature we defined two tables sales and 
>> products and a query. You can find the codes that initialize the envieonment 
>> here:
>> # First Run
>> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), 
>> (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
>> val productSeq = Seq((1, "A"), (2, "B"))
>> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet
>>  <hdfs://test/spark-optimization/sales.parquet>")
>> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet
>>  <hdfs://test/spark-optimization/products.parquet>”)
>> 
>> Then we run an other scala code to run the query. you can find the second 
>> run file here:
>> # Second Run
>> val salesDF = 
>> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
>> <hdfs://test/spark-optimization/sales.parquet>")
>> val productDF = 
>> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
>> <hdfs://test/spark-optimization/sales.parquet>")
>> salesDF.createOrReplaceTempView("sales")
>> productDF.createOrReplaceTempView("products")
>> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id 
>> <http://products.id/> and products.name = 'A'").explain()
>> 
>> Based on the DPP feature, we expect filters pushed down to file scan level 
>> that prevents reading unnecessary partitions. See the following picture:
>> <13551396-1591032620843.png>
>> 
>> 
>> But instead, Spark does not push down filters to the file scan layer and 
>> uses broadcast join without filtering partitions. See the following picture:
>> <13551394-1591032607773.png>
>> 
>> 
>> To better understand the situation, please have looked at this link. 
>> (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 
>> <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
>> We checked the DPP and adaptive query features are enabled in our spark 
>> cluster. So my question is, How can I debug and find the root cause of this 
>> problem?
>> 
>> 
>> Cheers,
> 

Reply via email to