Hi, Michael:
I would like to ask the same question, if the DF hash partitioned, then cache, 
now query/filter by the column which hashed for partition, will Spark be smart 
enough to do the Partition pruning in this case, instead of depending on 
Parquet's partition pruning. I think that is the original question.
Thanks
Yong

From: mich...@databricks.com
Date: Tue, 5 Apr 2016 13:28:46 -0700
Subject: Re: Partition pruning in spark 1.5.2
To: darshan.m...@gmail.com
CC: user@spark.apache.org

The following should ensure partition pruning happens:
df.write.partitionBy("country").save("/path/to/data")sqlContext.read.load("/path/to/data").where("country
 = 'UK'")
On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh <darshan.m...@gmail.com> wrote:
Thanks for the reply.
Now I saved the part_movies as parquet file.
Then created new dataframe from the saved parquet file and I did not persist 
it. The i ran the same query. It still read all 20 partitions and this time 
from hdfs.
So what will be exact scenario when it will prune partitions. I am bit confused 
now. Isnt there a way to see the exact partition pruning?
Thanks
On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com> wrote:
For the in-memory cache, we still launch tasks, we just skip blocks when 
possible using statistics about those blocks.
On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh <darshan.m...@gmail.com> wrote:
Thanks. It is not my exact scenario but I have tried to reproduce it. I have 
used 1.5.2.
I have a part-movies data-frame which has 20 partitions 1 each for a movie.
I created following query

val part_sql = sqlContext.sql("select * from part_movies where movie = 
10")part_sql.count()
I expect that this should just read from 1 partition i.e. partition 10. Other 
partitions it should max read metadata and not the data.
here is physical plan. I could see the filter. From here i can not say whether 
this filter is causing any partition pruning. If actually pruning is happening 
i would like to see a operator which mentions the same.== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#75L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L])
   Project
    Filter (movie#33 = 10)
     InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation 
[movie#33,title#34,genres#35], true, 10000, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None)
However, my assumption that partition is not pruned is not based on the above 
plan but when I look at the job and its stages. I could see that it has read 
full data of the dataframe.  I should see around 65KB as that is almost average 
size of each partition.
Aggregated Metrics by Executor
    
     
     
      Executor ID 
      Address 
      Task Time 
      Total Tasks 
      Failed Tasks 
      Succeeded Tasks 
       Input Size / Records  
        Shuffle Write Size / Records  
     
     
     
      
      driver 
      localhost:53247 
      0.4 s 
      20 
      0 
      20 
       1289.0 KB / 20  
       840.0 B / 20 


Task details only first 7. Here I expect that except 1 task(which access the 
partitions data) all others should be either 0 KB or just the size of metadata 
after which it discarded that partition as its data was not needed. But i could 
see that all the partitions are read.
This is small example so it doesnt make diff but for a large dataframe reading 
all the data even that in memory takes time.
Tasks

    
     
     
      
       


















      
      
      
       
       0 
       27 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       39 ms 
        12 ms  
        9 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       66.2 KB (memory) / 1 
       
       42.0 B / 1 
         
      
       
       1 
       28 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       41 ms 
        9 ms  
        7 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       63.9 KB (memory) / 1 
       1 ms
       42.0 B / 1 
         
      
       
       2 
       29 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       40 ms 
        7 ms  
        7 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       65.9 KB (memory) / 1 
       1 ms
       42.0 B / 1 
         
      
       
       3 
       30 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       6 ms 
        3 ms  
        5 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       62.0 KB (memory) / 1 
       
       42.0 B / 1 
         
      
       
       4 
       31 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       4 ms 
        4 ms  
        6 ms  
         
        1 ms  
        0 ms  
        0.0 B  
       69.2 KB (memory) / 1 
       
       42.0 B / 1 
         
      
       
       5 
       32 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       5 ms 
        2 ms  
        5 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       60.3 KB (memory) / 1 
       
       42.0 B / 1 
         
      
       
       6 
       33 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       5 ms 
        3 ms  
        4 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       70.3 KB (memory) / 1 
       
       42.0 B / 1 
         
      
       
       7 
       34 
       0 
       SUCCESS 
       PROCESS_LOCAL 
       driver / localhost 
       2016/04/05 19:01:03 
       4 ms 
        5 ms  
        4 ms  
         
        0 ms  
        0 ms  
        0.0 B  
       59.7 KB (memory) / 1 
       
       42.0 B / 1 
       
Let me know if you need anything else.
Thanks



On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust <mich...@databricks.com> wrote:
Can you show your full code.  How are you partitioning the data? How are you 
reading it?  What is the resulting query plan (run explain() or EXPLAIN).
On Tue, Apr 5, 2016 at 10:02 AM, dsing001 <darshan.m...@gmail.com> wrote:
HI,



I am using 1.5.2. I have a dataframe which is partitioned based on the

country. So I have around 150 partition in the dataframe. When I run

sparksql and use country = 'UK' it still reads all partitions and not able

to prune other partitions. Thus all the queries run for similar times

independent of what country I pass. Is it desired?



Is there a way to fix this in 1.5.2 by using some parameter or is it fixed

in latest versions?



Thanks







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



---------------------------------------------------------------------

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org












                                          

Reply via email to