Re: Partition pruning in spark 1.5.2
It worked fine and I was looking for this only as I do not want cache the dataframe as the data in some of partitions will change. However, I have much larger number of partitions(column is not just country but something where values can be 100's of thousands). Now the metdata is much bigger than individual partitions. The jobs are taking time in reading the metadata. Is there a way to keep just the metadata in cache. I do not want to use hive. Will tachyon provide better performance than hdfs in this scenario? I have to try it yet. Thanks a lot for all you help. On Tue, Apr 5, 2016 at 9:41 PM, Darshan Singhwrote: > Thanks a lot. I will try this one as well. > > On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust > wrote: > >> The following should ensure partition pruning happens: >> >> df.write.partitionBy("country").save("/path/to/data") >> sqlContext.read.load("/path/to/data").where("country = 'UK'") >> >> On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh >> wrote: >> >>> Thanks for the reply. >>> >>> Now I saved the part_movies as parquet file. >>> >>> Then created new dataframe from the saved parquet file and I did not >>> persist it. The i ran the same query. It still read all 20 partitions and >>> this time from hdfs. >>> >>> So what will be exact scenario when it will prune partitions. I am bit >>> confused now. Isnt there a way to see the exact partition pruning? >>> >>> Thanks >>> >>> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust >> > wrote: >>> For the in-memory cache, we still launch tasks, we just skip blocks when possible using statistics about those blocks. On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh wrote: > Thanks. It is not my exact scenario but I have tried to reproduce it. > I have used 1.5.2. > > I have a part-movies data-frame which has 20 partitions 1 each for a > movie. > > I created following query > > > val part_sql = sqlContext.sql("select * from part_movies where movie = > 10") > part_sql.count() > > I expect that this should just read from 1 partition i.e. partition > 10. Other partitions it should max read metadata and not the data. > > here is physical plan. I could see the filter. From here i can not say > whether this filter is causing any partition pruning. If actually pruning > is happening i would like to see a operator which mentions the same. > > == Physical Plan == > TungstenAggregate(key=[], > functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#93L]) >Project > Filter (movie#33 = 10) > InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], > (InMemoryRelation [movie#33,title#34,genres#35], true, 1, > StorageLevel(true, true, false, true, 1), (Scan > PhysicalRDD[movie#33,title#34,genres#35]), None) > > > However, my assumption that partition is not pruned is not based on > the above plan but when I look at the job and its stages. I could see that > it has read full data of the dataframe. I should see around 65KB as that > is almost average size of each partition. > > Aggregated Metrics by Executor > Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks > Input > Size / Records Shuffle Write Size / Records > driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 > > > Task details only first 7. Here I expect that except 1 task(which > access the partitions data) all others should be either 0 KB or just the > size of metadata after which it discarded that partition as its data was > not needed. But i could see that all the partitions are read. > > This is small example so it doesnt make diff but for a large dataframe > reading all the data even that in memory takes time. > > Tasks > > > > > > > > > > > > > > > > > > > 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 > ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 > 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 > ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 > 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 > ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 > 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 > ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 > 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 > ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2
RE: Partition pruning in spark 1.5.2
Hi, Michael: I would like to ask the same question, if the DF hash partitioned, then cache, now query/filter by the column which hashed for partition, will Spark be smart enough to do the Partition pruning in this case, instead of depending on Parquet's partition pruning. I think that is the original question. Thanks Yong From: mich...@databricks.com Date: Tue, 5 Apr 2016 13:28:46 -0700 Subject: Re: Partition pruning in spark 1.5.2 To: darshan.m...@gmail.com CC: user@spark.apache.org The following should ensure partition pruning happens: df.write.partitionBy("country").save("/path/to/data")sqlContext.read.load("/path/to/data").where("country = 'UK'") On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh <darshan.m...@gmail.com> wrote: Thanks for the reply. Now I saved the part_movies as parquet file. Then created new dataframe from the saved parquet file and I did not persist it. The i ran the same query. It still read all 20 partitions and this time from hdfs. So what will be exact scenario when it will prune partitions. I am bit confused now. Isnt there a way to see the exact partition pruning? Thanks On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com> wrote: For the in-memory cache, we still launch tasks, we just skip blocks when possible using statistics about those blocks. On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh <darshan.m...@gmail.com> wrote: Thanks. It is not my exact scenario but I have tried to reproduce it. I have used 1.5.2. I have a part-movies data-frame which has 20 partitions 1 each for a movie. I created following query val part_sql = sqlContext.sql("select * from part_movies where movie = 10")part_sql.count() I expect that this should just read from 1 partition i.e. partition 10. Other partitions it should max read metadata and not the data. here is physical plan. I could see the filter. From here i can not say whether this filter is causing any partition pruning. If actually pruning is happening i would like to see a operator which mentions the same.== Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L]) Project Filter (movie#33 = 10) InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation [movie#33,title#34,genres#35], true, 1, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None) However, my assumption that partition is not pruned is not based on the above plan but when I look at the job and its stages. I could see that it has read full data of the dataframe. I should see around 65KB as that is almost average size of each partition. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Size / Records Shuffle Write Size / Records driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 Task details only first 7. Here I expect that except 1 task(which access the partitions data) all others should be either 0 KB or just the size of metadata after which it discarded that partition as its data was not needed. But i could see that all the partitions are read. This is small example so it doesnt make diff but for a large dataframe reading all the data even that in memory takes time. Tasks 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
Re: Partition pruning in spark 1.5.2
Thanks a lot. I will try this one as well. On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrustwrote: > The following should ensure partition pruning happens: > > df.write.partitionBy("country").save("/path/to/data") > sqlContext.read.load("/path/to/data").where("country = 'UK'") > > On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh > wrote: > >> Thanks for the reply. >> >> Now I saved the part_movies as parquet file. >> >> Then created new dataframe from the saved parquet file and I did not >> persist it. The i ran the same query. It still read all 20 partitions and >> this time from hdfs. >> >> So what will be exact scenario when it will prune partitions. I am bit >> confused now. Isnt there a way to see the exact partition pruning? >> >> Thanks >> >> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust >> wrote: >> >>> For the in-memory cache, we still launch tasks, we just skip blocks when >>> possible using statistics about those blocks. >>> >>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh >>> wrote: >>> Thanks. It is not my exact scenario but I have tried to reproduce it. I have used 1.5.2. I have a part-movies data-frame which has 20 partitions 1 each for a movie. I created following query val part_sql = sqlContext.sql("select * from part_movies where movie = 10") part_sql.count() I expect that this should just read from 1 partition i.e. partition 10. Other partitions it should max read metadata and not the data. here is physical plan. I could see the filter. From here i can not say whether this filter is causing any partition pruning. If actually pruning is happening i would like to see a operator which mentions the same. == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L]) Project Filter (movie#33 = 10) InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation [movie#33,title#34,genres#35], true, 1, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None) However, my assumption that partition is not pruned is not based on the above plan but when I look at the job and its stages. I could see that it has read full data of the dataframe. I should see around 65KB as that is almost average size of each partition. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Size / Records Shuffle Write Size / Records driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 Task details only first 7. Here I expect that except 1 task(which access the partitions data) all others should be either 0 KB or just the size of metadata after which it discarded that partition as its data was not needed. But i could see that all the partitions are read. This is small example so it doesnt make diff but for a large dataframe reading all the data even that in memory takes time. Tasks 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 Let me know if you need anything else. Thanks On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust < mich...@databricks.com> wrote: > Can you show your full code. How are you partitioning the data? How > are you reading it? What is the resulting query plan (run
Re: Partition pruning in spark 1.5.2
The following should ensure partition pruning happens: df.write.partitionBy("country").save("/path/to/data") sqlContext.read.load("/path/to/data").where("country = 'UK'") On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singhwrote: > Thanks for the reply. > > Now I saved the part_movies as parquet file. > > Then created new dataframe from the saved parquet file and I did not > persist it. The i ran the same query. It still read all 20 partitions and > this time from hdfs. > > So what will be exact scenario when it will prune partitions. I am bit > confused now. Isnt there a way to see the exact partition pruning? > > Thanks > > On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust > wrote: > >> For the in-memory cache, we still launch tasks, we just skip blocks when >> possible using statistics about those blocks. >> >> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh >> wrote: >> >>> Thanks. It is not my exact scenario but I have tried to reproduce it. I >>> have used 1.5.2. >>> >>> I have a part-movies data-frame which has 20 partitions 1 each for a >>> movie. >>> >>> I created following query >>> >>> >>> val part_sql = sqlContext.sql("select * from part_movies where movie = >>> 10") >>> part_sql.count() >>> >>> I expect that this should just read from 1 partition i.e. partition 10. >>> Other partitions it should max read metadata and not the data. >>> >>> here is physical plan. I could see the filter. From here i can not say >>> whether this filter is causing any partition pruning. If actually pruning >>> is happening i would like to see a operator which mentions the same. >>> >>> == Physical Plan == >>> TungstenAggregate(key=[], >>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) >>> TungstenExchange SinglePartition >>> TungstenAggregate(key=[], >>> functions=[(count(1),mode=Partial,isDistinct=false)], >>> output=[currentCount#93L]) >>>Project >>> Filter (movie#33 = 10) >>> InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], >>> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, >>> StorageLevel(true, true, false, true, 1), (Scan >>> PhysicalRDD[movie#33,title#34,genres#35]), None) >>> >>> >>> However, my assumption that partition is not pruned is not based on the >>> above plan but when I look at the job and its stages. I could see that it >>> has read full data of the dataframe. I should see around 65KB as that is >>> almost average size of each partition. >>> >>> Aggregated Metrics by Executor >>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input >>> Size / Records Shuffle Write Size / Records >>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 >>> >>> >>> Task details only first 7. Here I expect that except 1 task(which access >>> the partitions data) all others should be either 0 KB or just the size of >>> metadata after which it discarded that partition as its data was not >>> needed. But i could see that all the partitions are read. >>> >>> This is small example so it doesnt make diff but for a large dataframe >>> reading all the data even that in memory takes time. >>> >>> Tasks >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 >>> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 >>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 >>> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 >>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 >>> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 >>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 >>> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 >>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4 >>> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 >>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2 >>> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 >>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3 >>> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 >>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5 >>> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 >>> >>> Let me know if you need anything else. >>> >>> Thanks >>> >>> >>> >>> >>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust >> > wrote: >>> Can you show your full code. How are you partitioning the data? How are you reading it? What is the resulting query plan (run explain() or EXPLAIN). On Tue, Apr 5, 2016 at 10:02 AM, dsing001 wrote: > HI, > > I am using 1.5.2. I have a dataframe which is partitioned based on the > country. So I have around 150 partition in the
Re: Partition pruning in spark 1.5.2
Thanks for the reply. Now I saved the part_movies as parquet file. Then created new dataframe from the saved parquet file and I did not persist it. The i ran the same query. It still read all 20 partitions and this time from hdfs. So what will be exact scenario when it will prune partitions. I am bit confused now. Isnt there a way to see the exact partition pruning? Thanks On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrustwrote: > For the in-memory cache, we still launch tasks, we just skip blocks when > possible using statistics about those blocks. > > On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh > wrote: > >> Thanks. It is not my exact scenario but I have tried to reproduce it. I >> have used 1.5.2. >> >> I have a part-movies data-frame which has 20 partitions 1 each for a >> movie. >> >> I created following query >> >> >> val part_sql = sqlContext.sql("select * from part_movies where movie = >> 10") >> part_sql.count() >> >> I expect that this should just read from 1 partition i.e. partition 10. >> Other partitions it should max read metadata and not the data. >> >> here is physical plan. I could see the filter. From here i can not say >> whether this filter is causing any partition pruning. If actually pruning >> is happening i would like to see a operator which mentions the same. >> >> == Physical Plan == >> TungstenAggregate(key=[], >> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) >> TungstenExchange SinglePartition >> TungstenAggregate(key=[], >> functions=[(count(1),mode=Partial,isDistinct=false)], >> output=[currentCount#93L]) >>Project >> Filter (movie#33 = 10) >> InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], >> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, >> StorageLevel(true, true, false, true, 1), (Scan >> PhysicalRDD[movie#33,title#34,genres#35]), None) >> >> >> However, my assumption that partition is not pruned is not based on the >> above plan but when I look at the job and its stages. I could see that it >> has read full data of the dataframe. I should see around 65KB as that is >> almost average size of each partition. >> >> Aggregated Metrics by Executor >> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input >> Size / Records Shuffle Write Size / Records >> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 >> >> >> Task details only first 7. Here I expect that except 1 task(which access >> the partitions data) all others should be either 0 KB or just the size of >> metadata after which it discarded that partition as its data was not >> needed. But i could see that all the partitions are read. >> >> This is small example so it doesnt make diff but for a large dataframe >> reading all the data even that in memory takes time. >> >> Tasks >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12 >> ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 >> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9 >> ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 >> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7 >> ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 >> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 >> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 >> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4 >> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 >> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2 >> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 >> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3 >> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 >> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5 >> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 >> >> Let me know if you need anything else. >> >> Thanks >> >> >> >> >> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust >> wrote: >> >>> Can you show your full code. How are you partitioning the data? How are >>> you reading it? What is the resulting query plan (run explain() or >>> EXPLAIN). >>> >>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 >>> wrote: >>> HI, I am using 1.5.2. I have a dataframe which is partitioned based on the country. So I have around 150 partition in the dataframe. When I run sparksql and use country = 'UK' it still reads all partitions and not able to prune other partitions. Thus all the queries run for similar times independent of what country I pass. Is it desired? Is there a way to fix this in 1.5.2 by using some parameter or is it fixed in latest versions? Thanks -- View
Re: Partition pruning in spark 1.5.2
For the in-memory cache, we still launch tasks, we just skip blocks when possible using statistics about those blocks. On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singhwrote: > Thanks. It is not my exact scenario but I have tried to reproduce it. I > have used 1.5.2. > > I have a part-movies data-frame which has 20 partitions 1 each for a movie. > > I created following query > > > val part_sql = sqlContext.sql("select * from part_movies where movie = 10") > part_sql.count() > > I expect that this should just read from 1 partition i.e. partition 10. > Other partitions it should max read metadata and not the data. > > here is physical plan. I could see the filter. From here i can not say > whether this filter is causing any partition pruning. If actually pruning > is happening i would like to see a operator which mentions the same. > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#75L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#93L]) >Project > Filter (movie#33 = 10) > InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], > (InMemoryRelation [movie#33,title#34,genres#35], true, 1, > StorageLevel(true, true, false, true, 1), (Scan > PhysicalRDD[movie#33,title#34,genres#35]), None) > > > However, my assumption that partition is not pruned is not based on the > above plan but when I look at the job and its stages. I could see that it > has read full data of the dataframe. I should see around 65KB as that is > almost average size of each partition. > > Aggregated Metrics by Executor > Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input > Size / Records Shuffle Write Size / Records > driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 > > > Task details only first 7. Here I expect that except 1 task(which access > the partitions data) all others should be either 0 KB or just the size of > metadata after which it discarded that partition as its data was not > needed. But i could see that all the partitions are read. > > This is small example so it doesnt make diff but for a large dataframe > reading all the data even that in memory takes time. > > Tasks > > > > > > > > > > > > > > > > > > > 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12 > ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 > 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9 > ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 > 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7 > ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 > 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 > ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 > 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4 > ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 > 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2 > ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 > 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3 > ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 > 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5 > ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 > > Let me know if you need anything else. > > Thanks > > > > > On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust > wrote: > >> Can you show your full code. How are you partitioning the data? How are >> you reading it? What is the resulting query plan (run explain() or >> EXPLAIN). >> >> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 wrote: >> >>> HI, >>> >>> I am using 1.5.2. I have a dataframe which is partitioned based on the >>> country. So I have around 150 partition in the dataframe. When I run >>> sparksql and use country = 'UK' it still reads all partitions and not >>> able >>> to prune other partitions. Thus all the queries run for similar times >>> independent of what country I pass. Is it desired? >>> >>> Is there a way to fix this in 1.5.2 by using some parameter or is it >>> fixed >>> in latest versions? >>> >>> Thanks >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: Partition pruning in spark 1.5.2
Thanks. It is not my exact scenario but I have tried to reproduce it. I have used 1.5.2. I have a part-movies data-frame which has 20 partitions 1 each for a movie. I created following query val part_sql = sqlContext.sql("select * from part_movies where movie = 10") part_sql.count() I expect that this should just read from 1 partition i.e. partition 10. Other partitions it should max read metadata and not the data. here is physical plan. I could see the filter. From here i can not say whether this filter is causing any partition pruning. If actually pruning is happening i would like to see a operator which mentions the same. == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L]) Project Filter (movie#33 = 10) InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation [movie#33,title#34,genres#35], true, 1, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None) However, my assumption that partition is not pruned is not based on the above plan but when I look at the job and its stages. I could see that it has read full data of the dataframe. I should see around 65KB as that is almost average size of each partition. Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input Size / Records Shuffle Write Size / Records driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20 Task details only first 7. Here I expect that except 1 task(which access the partitions data) all others should be either 0 KB or just the size of metadata after which it discarded that partition as its data was not needed. But i could see that all the partitions are read. This is small example so it doesnt make diff but for a large dataframe reading all the data even that in memory takes time. Tasks 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1 Let me know if you need anything else. Thanks On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrustwrote: > Can you show your full code. How are you partitioning the data? How are > you reading it? What is the resulting query plan (run explain() or > EXPLAIN). > > On Tue, Apr 5, 2016 at 10:02 AM, dsing001 wrote: > >> HI, >> >> I am using 1.5.2. I have a dataframe which is partitioned based on the >> country. So I have around 150 partition in the dataframe. When I run >> sparksql and use country = 'UK' it still reads all partitions and not able >> to prune other partitions. Thus all the queries run for similar times >> independent of what country I pass. Is it desired? >> >> Is there a way to fix this in 1.5.2 by using some parameter or is it fixed >> in latest versions? >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Partition pruning in spark 1.5.2
Can you show your full code. How are you partitioning the data? How are you reading it? What is the resulting query plan (run explain() or EXPLAIN). On Tue, Apr 5, 2016 at 10:02 AM, dsing001wrote: > HI, > > I am using 1.5.2. I have a dataframe which is partitioned based on the > country. So I have around 150 partition in the dataframe. When I run > sparksql and use country = 'UK' it still reads all partitions and not able > to prune other partitions. Thus all the queries run for similar times > independent of what country I pass. Is it desired? > > Is there a way to fix this in 1.5.2 by using some parameter or is it fixed > in latest versions? > > Thanks > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Partition pruning in spark 1.5.2
HI, I am using 1.5.2. I have a dataframe which is partitioned based on the country. So I have around 150 partition in the dataframe. When I run sparksql and use country = 'UK' it still reads all partitions and not able to prune other partitions. Thus all the queries run for similar times independent of what country I pass. Is it desired? Is there a way to fix this in 1.5.2 by using some parameter or is it fixed in latest versions? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org