Re: [Spark CORE][Spark SQL][Advanced]: Why dynamic partition pruning optimization does not work in this scenario?

2021-12-04 Thread Mohamadreza Rostami
Thank you for your response. It was a good point "under the broadcast join 
threshold." We test it on real data sets with tables size TBs, but instead, 
Spark uses merge sort join without DPP. Anyway, you said that the DPP is not 
implemented for broadcast joins? So, I wonder how DPP can be beneficial without 
broadcasting index tables(the table containing dynamic filters.)? Because the 
implementation that I have in my mind from DPP is something like:
1. Spark read the dynamic filters from index tables.
2. Broadcast dynamic filters on all nodes
3. executers push down dynamic filter to file scan layer and run the query

Thank you so much for your attention and participation.

> On Dec 4, 2021, at 20:44, Russell Spitzer  wrote:
> 
> This is probably because your data size is well under the broadcastJoin 
> threshold so at the planning phase it decides to do a BroadcastJoin instead 
> of a Join which could take advantage of dynamic partition pruning. For 
> testing like this you can always disable that with 
> spark.sql.autoBroadcastJoinThreshold=-1
> 
> In a real data scenario the size of the join tables would probably be much 
> larger than the default (10mb) and trigger a dynamic partition pruning 
> although I can see it may be beneficial to implement dynamic partition 
> pruning for broadcast joins as well...
> 
> 
>> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami 
>> mailto:mohamadrezarosta...@gmail.com>> wrote:
>> 
>> Hello all,
>> 
>> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
>> format. To speed-up our querys, we trying diffrent scenarios. We find out 
>> that Spark support dynamic partition pruning in versions after 3.0.0 . So, 
>> to test the improvment of DPP feature we defined two tables sales and 
>> products and a query. You can find the codes that initialize the envieonment 
>> here:
>> # First Run
>> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), 
>> (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
>> val productSeq = Seq((1, "A"), (2, "B"))
>> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet
>>  ")
>> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet
>>  ”)
>> 
>> Then we run an other scala code to run the query. you can find the second 
>> run file here:
>> # Second Run
>> val salesDF = 
>> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
>> ")
>> val productDF = 
>> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
>> ")
>> salesDF.createOrReplaceTempView("sales")
>> productDF.createOrReplaceTempView("products")
>> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id 
>> <http://products.id/> and products.name = 'A'").explain()
>> 
>> Based on the DPP feature, we expect filters pushed down to file scan level 
>> that prevents reading unnecessary partitions. See the following picture:
>> <13551396-1591032620843.png>
>> 
>> 
>> But instead, Spark does not push down filters to the file scan layer and 
>> uses broadcast join without filtering partitions. See the following picture:
>> <13551394-1591032607773.png>
>> 
>> 
>> To better understand the situation, please have looked at this link. 
>> (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 
>> <https://dzone.com/articles/dynamic-partition-pruning-in-spark-30>)
>> We checked the DPP and adaptive query features are enabled in our spark 
>> cluster. So my question is, How can I debug and find the root cause of this 
>> problem?
>> 
>> 
>> Cheers,
> 



Re: Scheduling Time > Processing Time

2021-06-20 Thread Mohamadreza Rostami
Hi,
I think it’s because of locality time out. In streaming tasks you must decrease 
the locality time out. 


Sent from my iPhone

> On Jun 20, 2021, at 11:55 PM, Siva Tarun Ponnada  wrote:
> 
> 
> Hi Team,
>  I have a spark streaming job which I am running in a single node 
> cluster. I often see the schedulingTime > Processing Time in streaming 
> statistics after a few minutes of my application startup. What does that 
> mean? Should I increase the no:of receivers? 
> 
> 
> 
> Regards
> Taun

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Benchmarks for Many-to-Many Joins

2021-04-22 Thread Mohamadreza Rostami
What kind of benchmark do you need to take? I mean, you want to benchmark Spark 
many to many joins, or you want to benchmark another aspect of spark or 
cluster? (such as network or disk)
If you want only to take a many-to-many join, you can use cross join or 
repartitioning the data with another key. These actions run in the many-to-many 
manner in spark cluster.

> On Ordibehesht 1, 1400 AP, at 21:25, Dhruv Kumar  
> wrote:
> 
> Hi
> 
> I wanted to ask if anyone knows any datasets or benchmarks which I can use 
> for evaluating many-to-many joins (as depicted in the attached snapshot). I 
> looked at TPC-H  and TPC-DS  
> benchmarks but surprisingly, they mostly have one-to-many joins and I could 
> not get much help there.
> 
> 
> 
> 
> 
> Thanks
> Dhruv
> 
> --
> Dhruv Kumar
> PhD Candidate
> Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me 
> 
> 



[Spark Core][Advanced]: Wrong memory allocation on standalone mode cluster

2021-04-18 Thread Mohamadreza Rostami
I see a bug in executer memory allocation in the standalone cluster, but I 
can't find which part of the spark code causes this problem. That why's I 
decided to raise this issue here.
Assume you have 3 workers with 10 CPU cores and 10 Gigabyte memories. Assume 
also you have 2 spark jobs that run on this cluster of workers, and these jobs 
configs set as below:
-
job-1:
executer-memory: 5g
executer-CPU: 4
max-cores: 8
--
job-2:
executer-memory: 6g
executer-CPU: 4
max-cores: 8
--
In this situation, We expect that if we submit both of these jobs, the first 
job that submits get  2 executers which each of them has 4 CPU core and 5g 
memory, and the second job gets only one executer on thirds worker who has 4 
CPU core and 6g memory because worker 1 and worker 2 doesn't have enough memory 
to accept the second job. But surprisingly, we see that one of the first or 
second workers creates an executor for job-2, and the worker's consuming memory 
goes beyond what's allocated to that and gets 11g memory from the operating 
system.
Is this behavior normal? I think this can cause some undefined behavior problem 
in the cluster.
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Mohamadreza Rostami
I have a Hadoop cluster that uses Apache Spark to query parquet files saved on 
Hadoop. For example, when i'm using the following PySpark code to find a word 
in parquet files:
df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()
After running this code, I go to spark application UI, stages tab, I see that 
locality level summery set on Any. In contrast, because of this query's nature, 
it must run locally and on NODE_LOCAL locality level at least. When I check the 
network IO of the cluster while running this, I find out that this query use 
network (network IO increases while the query is running). The strange part of 
this situation is that the number shown in the spark UI's shuffle section is 
very small.
How can I find out the root cause of this problem and solve that?
link of stackoverflow.com : 
https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache