Thanks a lot. I will try this one as well. On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust <mich...@databricks.com> wrote:
> The following should ensure partition pruning happens: > > df.write.partitionBy("country").save("/path/to/data") > sqlContext.read.load("/path/to/data").where("country = 'UK'") > > On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh <darshan.m...@gmail.com> > wrote: > >> Thanks for the reply. >> >> Now I saved the part_movies as parquet file. >> >> Then created new dataframe from the saved parquet file and I did not >> persist it. The i ran the same query. It still read all 20 partitions and >> this time from hdfs. >> >> So what will be exact scenario when it will prune partitions. I am bit >> confused now. Isnt there a way to see the exact partition pruning? >> >> Thanks >> >> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> For the in-memory cache, we still launch tasks, we just skip blocks when >>> possible using statistics about those blocks. >>> >>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh <darshan.m...@gmail.com> >>> wrote: >>> >>>> Thanks. It is not my exact scenario but I have tried to reproduce it. I >>>> have used 1.5.2. >>>> >>>> I have a part-movies data-frame which has 20 partitions 1 each for a >>>> movie. >>>> >>>> I created following query >>>> >>>> >>>> val part_sql = sqlContext.sql("select * from part_movies where movie = >>>> 10") >>>> part_sql.count() >>>> >>>> I expect that this should just read from 1 partition i.e. partition 10. >>>> Other partitions it should max read metadata and not the data. >>>> >>>> here is physical plan. I could see the filter. From here i can not say >>>> whether this filter is causing any partition pruning. If actually pruning >>>> is happening i would like to see a operator which mentions the same. >>>> >>>> == Physical Plan == >>>> TungstenAggregate(key=[], >>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) >>>> TungstenExchange SinglePartition >>>> TungstenAggregate(key=[], >>>> functions=[(count(1),mode=Partial,isDistinct=false)], >>>> output=[currentCount#93L]) >>>> Project >>>> Filter (movie#33 = 10) >>>> InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], >>>> (InMemoryRelation [movie#33,title#34,genres#35], true, 10000, >>>> StorageLevel(true, true, false, true, 1), (Scan >>>> PhysicalRDD[movie#33,title#34,genres#35]), None) >>>> >>>> >>>> However, my assumption that partition is not pruned is not based on the >>>> above plan but when I look at the job and its stages. I could see that it >>>> has read full data of the dataframe. I should see around 65KB as that is >>>> almost average size of each partition. >>>> >>>> Aggregated Metrics by Executor >>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks >>>> Input >>>> Size / Records Shuffle Write Size / Records >>>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 >>>> >>>> >>>> Task details only first 7. Here I expect that except 1 task(which >>>> access the partitions data) all others should be either 0 KB or just the >>>> size of metadata after which it discarded that partition as its data was >>>> not needed. But i could see that all the partitions are read. >>>> >>>> This is small example so it doesnt make diff but for a large dataframe >>>> reading all the data even that in memory takes time. >>>> >>>> Tasks >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 >>>> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 >>>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 >>>> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 >>>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 >>>> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 >>>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 >>>> ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 >>>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 >>>> ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 >>>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 >>>> ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 >>>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 >>>> ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 >>>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 >>>> ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 >>>> >>>> Let me know if you need anything else. >>>> >>>> Thanks >>>> >>>> >>>> >>>> >>>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> Can you show your full code. How are you partitioning the data? How >>>>> are you reading it? What is the resulting query plan (run explain() or >>>>> EXPLAIN). >>>>> >>>>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 <darshan.m...@gmail.com> >>>>> wrote: >>>>> >>>>>> HI, >>>>>> >>>>>> I am using 1.5.2. I have a dataframe which is partitioned based on the >>>>>> country. So I have around 150 partition in the dataframe. When I run >>>>>> sparksql and use country = 'UK' it still reads all partitions and not >>>>>> able >>>>>> to prune other partitions. Thus all the queries run for similar times >>>>>> independent of what country I pass. Is it desired? >>>>>> >>>>>> Is there a way to fix this in 1.5.2 by using some parameter or is it >>>>>> fixed >>>>>> in latest versions? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >