It worked fine and I was looking for this only as I do not want cache the
dataframe as the data in some of partitions will change. However, I have
much larger number of partitions(column is not just country but something
where values can be 100's of thousands). Now the metdata is much bigger
than individual partitions. The jobs are taking time in reading the
metadata.

Is there a way to keep just the metadata in cache. I do not want to use
hive.

Will tachyon provide better performance than hdfs in this scenario? I have
to try it yet.

Thanks a lot for all you help.

On Tue, Apr 5, 2016 at 9:41 PM, Darshan Singh <darshan.m...@gmail.com>
wrote:

> Thanks a lot. I will try this one  as well.
>
> On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> The following should ensure partition pruning happens:
>>
>> df.write.partitionBy("country").save("/path/to/data")
>> sqlContext.read.load("/path/to/data").where("country = 'UK'")
>>
>> On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh <darshan.m...@gmail.com>
>> wrote:
>>
>>> Thanks for the reply.
>>>
>>> Now I saved the part_movies as parquet file.
>>>
>>> Then created new dataframe from the saved parquet file and I did not
>>> persist it. The i ran the same query. It still read all 20 partitions and
>>> this time from hdfs.
>>>
>>> So what will be exact scenario when it will prune partitions. I am bit
>>> confused now. Isnt there a way to see the exact partition pruning?
>>>
>>> Thanks
>>>
>>> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust <mich...@databricks.com
>>> > wrote:
>>>
>>>> For the in-memory cache, we still launch tasks, we just skip blocks
>>>> when possible using statistics about those blocks.
>>>>
>>>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh <darshan.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks. It is not my exact scenario but I have tried to reproduce it.
>>>>> I have used 1.5.2.
>>>>>
>>>>> I have a part-movies data-frame which has 20 partitions 1 each for a
>>>>> movie.
>>>>>
>>>>> I created following query
>>>>>
>>>>>
>>>>> val part_sql = sqlContext.sql("select * from part_movies where movie =
>>>>> 10")
>>>>> part_sql.count()
>>>>>
>>>>> I expect that this should just read from 1 partition i.e. partition
>>>>> 10. Other partitions it should max read metadata and not the data.
>>>>>
>>>>> here is physical plan. I could see the filter. From here i can not say
>>>>> whether this filter is causing any partition pruning. If actually pruning
>>>>> is happening i would like to see a operator which mentions the same.
>>>>>
>>>>> == Physical Plan ==
>>>>> TungstenAggregate(key=[], 
>>>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>>>>>  TungstenExchange SinglePartition
>>>>>   TungstenAggregate(key=[], 
>>>>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>>>>> output=[currentCount#93L])
>>>>>    Project
>>>>>     Filter (movie#33 = 10)
>>>>>      InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
>>>>> (InMemoryRelation [movie#33,title#34,genres#35], true, 10000, 
>>>>> StorageLevel(true, true, false, true, 1), (Scan 
>>>>> PhysicalRDD[movie#33,title#34,genres#35]), None)
>>>>>
>>>>>
>>>>> However, my assumption that partition is not pruned is not based on
>>>>> the above plan but when I look at the job and its stages. I could see that
>>>>> it has read full data of the dataframe.  I should see around 65KB as that
>>>>> is almost average size of each partition.
>>>>>
>>>>> Aggregated Metrics by Executor
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks 
>>>>> Input
>>>>> Size / Records Shuffle Write Size / Records
>>>>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>>>>>
>>>>>
>>>>> Task details only first 7. Here I expect that except 1 task(which
>>>>> access the partitions data) all others should be either 0 KB or just the
>>>>> size of metadata after which it discarded that partition as its data was
>>>>> not needed. But i could see that all the partitions are read.
>>>>>
>>>>> This is small example so it doesnt make diff but for a large dataframe
>>>>> reading all the data even that in memory takes time.
>>>>>
>>>>> Tasks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
>>>>> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
>>>>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
>>>>> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
>>>>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
>>>>> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
>>>>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6
>>>>> ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
>>>>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
>>>>> ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
>>>>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
>>>>> ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
>>>>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
>>>>> ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
>>>>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
>>>>> ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>>>>>
>>>>> Let me know if you need anything else.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> Can you show your full code.  How are you partitioning the data? How
>>>>>> are you reading it?  What is the resulting query plan (run explain() or
>>>>>> EXPLAIN).
>>>>>>
>>>>>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 <darshan.m...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> HI,
>>>>>>>
>>>>>>> I am using 1.5.2. I have a dataframe which is partitioned based on
>>>>>>> the
>>>>>>> country. So I have around 150 partition in the dataframe. When I run
>>>>>>> sparksql and use country = 'UK' it still reads all partitions and
>>>>>>> not able
>>>>>>> to prune other partitions. Thus all the queries run for similar times
>>>>>>> independent of what country I pass. Is it desired?
>>>>>>>
>>>>>>> Is there a way to fix this in 1.5.2 by using some parameter or is it
>>>>>>> fixed
>>>>>>> in latest versions?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to