Re: cache table vs. parquet table performance
Hi Tomas, One option is to cache your table as Parquet files into Alluxio (which can serve as an in-memory distributed caching layer for Spark in your case). The code on Spark will be like > df.write.parquet("alluxio://master:19998/data.parquet")> df = > sqlContext.read.parquet("alluxio://master:19998/data.parquet") (See more details at the documentation http://www.alluxio.org/docs/1.8/en/compute/Spark.html <http://www.alluxio.org/docs/1.8/en/compute/Spark.html#cache-dataframe-into-alluxio?utm_source=spark> ) This would require running Alluxio as a separate service (ideally colocated with Spark servers), of course. But also enables data sharing across Spark jobs. - Bin On Tue, Jan 15, 2019 at 10:29 AM Tomas Bartalos wrote: > Hello, > > I'm using spark-thrift server and I'm searching for best performing > solution to query hot set of data. I'm processing records with nested > structure, containing subtypes and arrays. 1 record takes up several KB. > > I tried to make some improvement with cache table: > > cache table event_jan_01 as select * from events where day_registered = > 20190102; > > > If I understood correctly, the data should be stored in *in-memory > columnar* format with storage level MEMORY_AND_DISK. So data which > doesn't fit to memory will be spille to disk (I assume also in columnar > format (?)) > I cached 1 day of data (1 M records) and according to spark UI storage tab > none of the data was cached to memory and everything was spilled to disk. > The size of the data was *5.7 GB.* > Typical queries took ~ 20 sec. > > Then I tried to store the data to parquet format: > > CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" > as > > select * from event_jan_01; > > > The whole parquet took up only *178MB.* > And typical queries took 5-10 sec. > > Is it possible to tune spark to spill the cached data in parquet format ? > Why the whole cached table was spilled to disk and nothing stayed in > memory ? > > Spark version: 2.4.0 > > Best regards, > Tomas > >
Re: cache table vs. parquet table performance
I believe the in-memory solution misses the storage indexes that parquet / orc have. The in-memory solution is more suitable if you iterate in the whole set of data frequently. > Am 15.01.2019 um 19:20 schrieb Tomas Bartalos : > > Hello, > > I'm using spark-thrift server and I'm searching for best performing solution > to query hot set of data. I'm processing records with nested structure, > containing subtypes and arrays. 1 record takes up several KB. > > I tried to make some improvement with cache table: > cache table event_jan_01 as select * from events where day_registered = > 20190102; > > If I understood correctly, the data should be stored in in-memory columnar > format with storage level MEMORY_AND_DISK. So data which doesn't fit to > memory will be spille to disk (I assume also in columnar format (?)) > I cached 1 day of data (1 M records) and according to spark UI storage tab > none of the data was cached to memory and everything was spilled to disk. The > size of the data was 5.7 GB. > Typical queries took ~ 20 sec. > > Then I tried to store the data to parquet format: > CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" as > select * from event_jan_01; > > The whole parquet took up only 178MB. > And typical queries took 5-10 sec. > > Is it possible to tune spark to spill the cached data in parquet format ? > Why the whole cached table was spilled to disk and nothing stayed in memory ? > > Spark version: 2.4.0 > > Best regards, > Tomas >
Re: cache table vs. parquet table performance
Hi Tomas, Have you considered using something like https://www.alluxio.org/ for you cache? Seems like a possible solution for what your trying to do. -Todd On Tue, Jan 15, 2019 at 11:24 PM 大啊 wrote: > Hi ,Tomas. > Thanks for your question give me some prompt.But the best way use cache > usually stores smaller data. > I think cache large data will consume memory or disk space too much. > Spill the cached data in parquet format maybe a good improvement. > > At 2019-01-16 02:20:56, "Tomas Bartalos" wrote: > > Hello, > > I'm using spark-thrift server and I'm searching for best performing > solution to query hot set of data. I'm processing records with nested > structure, containing subtypes and arrays. 1 record takes up several KB. > > I tried to make some improvement with cache table: > > cache table event_jan_01 as select * from events where day_registered = > 20190102; > > > If I understood correctly, the data should be stored in *in-memory > columnar* format with storage level MEMORY_AND_DISK. So data which > doesn't fit to memory will be spille to disk (I assume also in columnar > format (?)) > I cached 1 day of data (1 M records) and according to spark UI storage tab > none of the data was cached to memory and everything was spilled to disk. > The size of the data was *5.7 GB.* > Typical queries took ~ 20 sec. > > Then I tried to store the data to parquet format: > > CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" > as > > select * from event_jan_01; > > > The whole parquet took up only *178MB.* > And typical queries took 5-10 sec. > > Is it possible to tune spark to spill the cached data in parquet format ? > Why the whole cached table was spilled to disk and nothing stayed in > memory ? > > Spark version: 2.4.0 > > Best regards, > Tomas > > > > >
cache table vs. parquet table performance
Hello, I'm using spark-thrift server and I'm searching for best performing solution to query hot set of data. I'm processing records with nested structure, containing subtypes and arrays. 1 record takes up several KB. I tried to make some improvement with cache table: cache table event_jan_01 as select * from events where day_registered = 20190102; If I understood correctly, the data should be stored in *in-memory columnar* format with storage level MEMORY_AND_DISK. So data which doesn't fit to memory will be spille to disk (I assume also in columnar format (?)) I cached 1 day of data (1 M records) and according to spark UI storage tab none of the data was cached to memory and everything was spilled to disk. The size of the data was *5.7 GB.* Typical queries took ~ 20 sec. Then I tried to store the data to parquet format: CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" as select * from event_jan_01; The whole parquet took up only *178MB.* And typical queries took 5-10 sec. Is it possible to tune spark to spill the cached data in parquet format ? Why the whole cached table was spilled to disk and nothing stayed in memory ? Spark version: 2.4.0 Best regards, Tomas
Re: Will spark cache table once even if I call read/cache on the same table multiple times
If you have 2 different RDD (as 2 different references and RDD ID shown in your example), then YES, Spark will cache 2 exactly same thing in the memory. There is no way that spark will compare and know that they are the same content. You define them as 2 RDD, then they are different RDDs, and will be cached individually. Yong From: Taotao.Li <charles.up...@gmail.com> Sent: Sunday, November 20, 2016 6:18 AM To: Rabin Banerjee Cc: Yong Zhang; user; Mich Talebzadeh; Tathagata Das Subject: Re: Will spark cache table once even if I call read/cache on the same table multiple times hi, you can check my stackoverflow question : http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812 On Sat, Nov 19, 2016 at 3:16 AM, Rabin Banerjee <dev.rabin.baner...@gmail.com<mailto:dev.rabin.baner...@gmail.com>> wrote: Hi Yong, But every time val tabdf = sqlContext.table(tablename) is called tabdf.rdd is having a new id which can be checked by calling tabdf.rdd.id<http://tabdf.rdd.id> . And, https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fbd8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268 Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , will spark cache same data again and again ?? For example , val tabdf = sqlContext.table("employee") tabdf.cache() tabdf.someTransformation.someAction println(tabledf.rdd.id<http://tabledf.rdd.id>) val tabdf1 = sqlContext.table("employee") tabdf1.cache() <= Will spark again go to disk read and load data into memory or look into cache ? tabdf1.someTransformation.someAction println(tabledf1.rdd.id<http://tabledf1.rdd.id>) Regards, R Banerjee On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang <java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote: That's correct, as long as you don't change the StorageLevel. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166 Yong From: Rabin Banerjee <dev.rabin.baner...@gmail.com<mailto:dev.rabin.baner...@gmail.com>> Sent: Friday, November 18, 2016 10:36 AM To: user; Mich Talebzadeh; Tathagata Das Subject: Will spark cache table once even if I call read/cache on the same table multiple times Hi All , I am working in a project where code is divided into multiple reusable module . I am not able to understand spark persist/cache on that context. My Question is Will spark cache table once even if I call read/cache on the same table multiple times ?? Sample Code :: TableReader:: def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = { val tabdf = sqlContext.table(tablename) if(persist) { tabdf.cache() } return tableDF } Now Module1:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Module2:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction ModuleN:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Will spark cache emp table once , or it will cache every time I am calling ?? Shall I maintain a global hashmap to handle that ? something like Map[String,DataFrame] ?? Regards, Rabin Banerjee -- ___ Quant | Engineer | Boy ___ blog: http://litaotao.github.io<http://litaotao.github.io?utm_source=spark_mail> github: www.github.com/litaotao<http://www.github.com/litaotao>
Re: Will spark cache table once even if I call read/cache on the same table multiple times
hi, you can check my stackoverflow question : http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812 On Sat, Nov 19, 2016 at 3:16 AM, Rabin Banerjee < dev.rabin.baner...@gmail.com> wrote: > Hi Yong, > > But every time val tabdf = sqlContext.table(tablename) is called tabdf.rdd > is having a new id which can be checked by calling tabdf.rdd.id . > And, > https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fb > d8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268 > > Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , > will spark cache same data again and again ?? > > For example , > > val tabdf = sqlContext.table("employee") > tabdf.cache() > tabdf.someTransformation.someAction > println(tabledf.rdd.id) > val tabdf1 = sqlContext.table("employee") > tabdf1.cache() <= *Will spark again go to disk read and load data into > memory or look into cache ?* > tabdf1.someTransformation.someAction > println(tabledf1.rdd.id) > > Regards, > R Banerjee > > > > > On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang <java8...@hotmail.com> wrote: > >> That's correct, as long as you don't change the StorageLevel. >> >> >> https://github.com/apache/spark/blob/master/core/src/main/ >> scala/org/apache/spark/rdd/RDD.scala#L166 >> >> >> >> Yong >> >> ------ >> *From:* Rabin Banerjee <dev.rabin.baner...@gmail.com> >> *Sent:* Friday, November 18, 2016 10:36 AM >> *To:* user; Mich Talebzadeh; Tathagata Das >> *Subject:* Will spark cache table once even if I call read/cache on the >> same table multiple times >> >> Hi All , >> >> I am working in a project where code is divided into multiple reusable >> module . I am not able to understand spark persist/cache on that context. >> >> My Question is Will spark cache table once even if I call read/cache on >> the same table multiple times ?? >> >> Sample Code :: >> >> TableReader:: >> >>def getTableDF(tablename:String,persist:Boolean = false) : DataFrame >> = { >> val tabdf = sqlContext.table(tablename) >> if(persist) { >> tabdf.cache() >> } >> return tableDF >> } >> >> Now >> Module1:: >> val emp = TableReader.getTable("employee") >> emp.someTransformation.someAction >> >> Module2:: >> val emp = TableReader.getTable("employee") >> emp.someTransformation.someAction >> >> >> >> ModuleN:: >> val emp = TableReader.getTable("employee") >> emp.someTransformation.someAction >> >> Will spark cache emp table once , or it will cache every time I am >> calling ?? Shall I maintain a global hashmap to handle that ? something >> like Map[String,DataFrame] ?? >> >> Regards, >> Rabin Banerjee >> >> >> >> > -- *___* Quant | Engineer | Boy *___* *blog*:http://litaotao.github.io <http://litaotao.github.io?utm_source=spark_mail> *github*: www.github.com/litaotao
Re: Will spark cache table once even if I call read/cache on the same table multiple times
Hi Yong, But every time val tabdf = sqlContext.table(tablename) is called tabdf.rdd is having a new id which can be checked by calling tabdf.rdd.id . And, https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fbd8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268 Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , will spark cache same data again and again ?? For example , val tabdf = sqlContext.table("employee") tabdf.cache() tabdf.someTransformation.someAction println(tabledf.rdd.id) val tabdf1 = sqlContext.table("employee") tabdf1.cache() <= *Will spark again go to disk read and load data into memory or look into cache ?* tabdf1.someTransformation.someAction println(tabledf1.rdd.id) Regards, R Banerjee On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang <java8...@hotmail.com> wrote: > That's correct, as long as you don't change the StorageLevel. > > > https://github.com/apache/spark/blob/master/core/src/ > main/scala/org/apache/spark/rdd/RDD.scala#L166 > > > > Yong > > -- > *From:* Rabin Banerjee <dev.rabin.baner...@gmail.com> > *Sent:* Friday, November 18, 2016 10:36 AM > *To:* user; Mich Talebzadeh; Tathagata Das > *Subject:* Will spark cache table once even if I call read/cache on the > same table multiple times > > Hi All , > > I am working in a project where code is divided into multiple reusable > module . I am not able to understand spark persist/cache on that context. > > My Question is Will spark cache table once even if I call read/cache on > the same table multiple times ?? > > Sample Code :: > > TableReader:: > >def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = > { > val tabdf = sqlContext.table(tablename) > if(persist) { > tabdf.cache() > } > return tableDF > } > > Now > Module1:: > val emp = TableReader.getTable("employee") > emp.someTransformation.someAction > > Module2:: > val emp = TableReader.getTable("employee") > emp.someTransformation.someAction > > > > ModuleN:: > val emp = TableReader.getTable("employee") > emp.someTransformation.someAction > > Will spark cache emp table once , or it will cache every time I am calling > ?? Shall I maintain a global hashmap to handle that ? something like > Map[String,DataFrame] ?? > > Regards, > Rabin Banerjee > > > >
Re: Will spark cache table once even if I call read/cache on the same table multiple times
That's correct, as long as you don't change the StorageLevel. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166 Yong From: Rabin Banerjee <dev.rabin.baner...@gmail.com> Sent: Friday, November 18, 2016 10:36 AM To: user; Mich Talebzadeh; Tathagata Das Subject: Will spark cache table once even if I call read/cache on the same table multiple times Hi All , I am working in a project where code is divided into multiple reusable module . I am not able to understand spark persist/cache on that context. My Question is Will spark cache table once even if I call read/cache on the same table multiple times ?? Sample Code :: TableReader:: def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = { val tabdf = sqlContext.table(tablename) if(persist) { tabdf.cache() } return tableDF } Now Module1:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Module2:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction ModuleN:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Will spark cache emp table once , or it will cache every time I am calling ?? Shall I maintain a global hashmap to handle that ? something like Map[String,DataFrame] ?? Regards, Rabin Banerjee
Will spark cache table once even if I call read/cache on the same table multiple times
Hi All , I am working in a project where code is divided into multiple reusable module . I am not able to understand spark persist/cache on that context. My Question is Will spark cache table once even if I call read/cache on the same table multiple times ?? Sample Code :: TableReader:: def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = { val tabdf = sqlContext.table(tablename) if(persist) { tabdf.cache() } return tableDF } Now Module1:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Module2:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction ModuleN:: val emp = TableReader.getTable("employee") emp.someTransformation.someAction Will spark cache emp table once , or it will cache every time I am calling ?? Shall I maintain a global hashmap to handle that ? something like Map[String,DataFrame] ?? Regards, Rabin Banerjee
How to verify in Spark 1.6.x usage, User Memory used after Cache table
Hi Team, I am using HDP 2.4 Sandbox for checking Spark 1.6 memory feature. I have connected to spark using spark thrift server through Squirrel (JDBC Client) and executed the CACHE command to cache the hive table. Command execution is successful and SQL is returning data in less than seconds. But i did not find any way to check if Spark is using User Memory or not. Please let me know if we can verify the scenario. Thanks, Yogesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-verify-in-Spark-1-6-x-usage-User-Memory-used-after-Cache-table-tp27343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Re: About cache table performance in spark sql
Hi, Parquet data are column-wise and highly compressed, so the size of deserialized rows in spark could be bigger than that of parquet data on disk. That is, I think that 24.59GB of parquet data becomes (18.1GB + 23.6GB) data in spark. Yes, you know cached data in spark also are compressed by default though, spark uses simpler compression algorithms than parquet does and ISTM the compression ratios are typically worse than those of parquet. On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com <fightf...@163.com> wrote: > Hi, > Thanks a lot for your explaination. I know that the slow process mainly > caused by GC pressure and I had understand this difference > just from your advice. > > I had each executor memory with 6GB and try to cache table. > I had 3 executors and finally I can see some info from the spark job ui > storage, like the following: > > > RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory > Size in ExternalBlockStore Size on Disk > In-memory table video1203 Memory Deserialized 1x Replicated 251 100% > 18.1 GB 0.0 B 23.6 GB > > I can see that spark sql try to cache data into memory. And when I ran the > following queries over this table video1203, I can get > fast response. Another thing that confused me is that the above data size > (in memory and on Disk). I can see that the in memory > data size is 18.1GB, which almost equals sum of my executor memory. But > why the Disk size if 23.6GB? From impala I get the overall > parquet file size if about 24.59GB. Would be good to had some correction > on this. > > Best, > Sun. > > -- > fightf...@163.com > > > *From:* Prabhu Joseph <prabhujose.ga...@gmail.com> > *Date:* 2016-02-04 14:35 > *To:* fightf...@163.com > *CC:* user <user@spark.apache.org> > *Subject:* Re: About cache table performance in spark sql > Sun, > >When Executor don't have enough memory and if it tries to cache the > data, it spends lot of time on GC and hence the job will be slow. Either, > > 1. We should allocate enough memory to cache all RDD and hence the > job will complete fast > Or 2. Don't use cache when there is not enough Executor memory. > > To check the GC time, use --conf > "spark.executor.extraJavaOptions=-XX:+PrintGCDetails > -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will > have sysout with GC. > The sysout will show many "Full GC" happening when cache is used and > executor does not have enough heap. > > > Thanks, > Prabhu Joseph > > On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com> > wrote: > >> Hi, >> >> I want to make sure that the cache table indeed would accelerate sql >> queries. Here is one of my use case : >> impala table size : 24.59GB, no partitions, with about 1 billion+ rows. >> I use sqlContext.sql to run queries over this table and try to do cache >> and uncache command to see if there >> is any performance disparity. I ran the following query : >> select * from video1203 where id > 10 and id < 20 and added_year != 1989 >> I can see the following results : >> >> 1 If I did not run cache table and just ran sqlContext.sql(), I can see >> the above query run about 25 seconds. >> 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs >> super slow and would cause driver OOM exception, but I can >> get final results with about running 9 minuts. >> >> Would any expert can explain this for me ? I can see that cacheTable >> cause OOM just because the in-memory columnar storage >> cannot hold the 24.59GB+ table size into memory. But why the performance >> is so different and even so bad ? >> >> Best, >> Sun. >> >> -- >> fightf...@163.com >> > > -- --- Takeshi Yamamuro
Re: Re: About cache table performance in spark sql
Oh, thanks. Make sense to me. Best, Sun. fightf...@163.com From: Takeshi Yamamuro Date: 2016-02-04 16:01 To: fightf...@163.com CC: user Subject: Re: Re: About cache table performance in spark sql Hi, Parquet data are column-wise and highly compressed, so the size of deserialized rows in spark could be bigger than that of parquet data on disk. That is, I think that 24.59GB of parquet data becomes (18.1GB + 23.6GB) data in spark. Yes, you know cached data in spark also are compressed by default though, spark uses simpler compression algorithms than parquet does and ISTM the compression ratios are typically worse than those of parquet. On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com <fightf...@163.com> wrote: Hi, Thanks a lot for your explaination. I know that the slow process mainly caused by GC pressure and I had understand this difference just from your advice. I had each executor memory with 6GB and try to cache table. I had 3 executors and finally I can see some info from the spark job ui storage, like the following: RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 0.0 B 23.6 GB I can see that spark sql try to cache data into memory. And when I ran the following queries over this table video1203, I can get fast response. Another thing that confused me is that the above data size (in memory and on Disk). I can see that the in memory data size is 18.1GB, which almost equals sum of my executor memory. But why the Disk size if 23.6GB? From impala I get the overall parquet file size if about 24.59GB. Would be good to had some correction on this. Best, Sun. fightf...@163.com From: Prabhu Joseph Date: 2016-02-04 14:35 To: fightf...@163.com CC: user Subject: Re: About cache table performance in spark sql Sun, When Executor don't have enough memory and if it tries to cache the data, it spends lot of time on GC and hence the job will be slow. Either, 1. We should allocate enough memory to cache all RDD and hence the job will complete fast Or 2. Don't use cache when there is not enough Executor memory. To check the GC time, use --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will have sysout with GC. The sysout will show many "Full GC" happening when cache is used and executor does not have enough heap. Thanks, Prabhu Joseph On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com> wrote: Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com -- --- Takeshi Yamamuro
Re: Re: About cache table performance in spark sql
Hi, Thanks a lot for your explaination. I know that the slow process mainly caused by GC pressure and I had understand this difference just from your advice. I had each executor memory with 6GB and try to cache table. I had 3 executors and finally I can see some info from the spark job ui storage, like the following: RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 0.0 B 23.6 GB I can see that spark sql try to cache data into memory. And when I ran the following queries over this table video1203, I can get fast response. Another thing that confused me is that the above data size (in memory and on Disk). I can see that the in memory data size is 18.1GB, which almost equals sum of my executor memory. But why the Disk size if 23.6GB? From impala I get the overall parquet file size if about 24.59GB. Would be good to had some correction on this. Best, Sun. fightf...@163.com From: Prabhu Joseph Date: 2016-02-04 14:35 To: fightf...@163.com CC: user Subject: Re: About cache table performance in spark sql Sun, When Executor don't have enough memory and if it tries to cache the data, it spends lot of time on GC and hence the job will be slow. Either, 1. We should allocate enough memory to cache all RDD and hence the job will complete fast Or 2. Don't use cache when there is not enough Executor memory. To check the GC time, use --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will have sysout with GC. The sysout will show many "Full GC" happening when cache is used and executor does not have enough heap. Thanks, Prabhu Joseph On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com> wrote: Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com
Re: About cache table performance in spark sql
Sun, When Executor don't have enough memory and if it tries to cache the data, it spends lot of time on GC and hence the job will be slow. Either, 1. We should allocate enough memory to cache all RDD and hence the job will complete fast Or 2. Don't use cache when there is not enough Executor memory. To check the GC time, use --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will have sysout with GC. The sysout will show many "Full GC" happening when cache is used and executor does not have enough heap. Thanks, Prabhu Joseph On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com> wrote: > Hi, > > I want to make sure that the cache table indeed would accelerate sql > queries. Here is one of my use case : > impala table size : 24.59GB, no partitions, with about 1 billion+ rows. > I use sqlContext.sql to run queries over this table and try to do cache > and uncache command to see if there > is any performance disparity. I ran the following query : > select * from video1203 where id > 10 and id < 20 and added_year != 1989 > I can see the following results : > > 1 If I did not run cache table and just ran sqlContext.sql(), I can see > the above query run about 25 seconds. > 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs > super slow and would cause driver OOM exception, but I can > get final results with about running 9 minuts. > > Would any expert can explain this for me ? I can see that cacheTable cause > OOM just because the in-memory columnar storage > cannot hold the 24.59GB+ table size into memory. But why the performance > is so different and even so bad ? > > Best, > Sun. > > -- > fightf...@163.com >
About cache table performance in spark sql
Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com
Cache table as
Hi all, I'm connected to the thrift server using beeline on Spark 1.6. I used : cache table tbl as select * from table1 I see table1 in the storage memory. I can use it. But when I reconnect, I cant quert it anymore. I get : Error: org.apache.spark.sql.AnalysisException: Table not found: table1; line 1 pos 43 (state=,code=0) However, the table is still in the cache storage. Any idea? Younes
why "cache table a as select * from b" will do shuffle,and create 2 stages.
why "cache table a as select * from b" will do shuffle,and create 2 stages. example: table "ods_pay_consume" is from "KafkaUtils.createDirectStream" hiveContext.sql("cache table dwd_pay_consume as select * from ods_pay_consume") this code will make 2 statges of DAG hiveContext.sql("cache table dw_game_server_recharge as select * from dwd_pay_consume") this code also will make 2 stages of DAG,and it is similar caculate from the beginning for ther DAG Visualization tool,"cache table dwd_pay_consume" is not effect.
why "cache table a as select * from b" will do shuffle,and create 2 stages.
why "cache table a as select * from b" will do shuffle,and create 2 stages. example: table "ods_pay_consume" is from "KafkaUtils.createDirectStream" hiveContext.sql("cache table dwd_pay_consume as select * from ods_pay_consume") this code will make 2 statges of DAG hiveContext.sql("cache table dw_game_server_recharge as select * from dwd_pay_consume") this code also will make 2 stages of DAG,and it is similar caculate from the beginning for ther DAG Visualization tool,"cache table dwd_pay_consume" is not effect.
SparkSQL cache table with multiple replicas
Hi all, Do you know if there is an option to specify how many replicas we want while caching in memory a table in SparkSQL Thrift server? I have not seen any option so far but I assumed there is an option as you can see in the Storage section of the UI that there is 1 x replica of your Dataframe/Table... I believe there can be a good use case on where you want to replicate a dimension table across your nodes to improve response times when running typical BI DWH types of queries (Just to avoid having to broadcast data every time and again). Do you think that would be a good addition to SparkSQL? Regards.
how to cache table with OFF_HEAP storage level in SparkSQL thriftserver
hi all: I got a spark on yarn cluster (spark-1.3.0, hadoop-2.2.0) with hive-0.12.0 and tachyon-0.6.1, and now I start SparkSQL thriftserver with start-thriftserver.sh, and use beeline to connect to thriftserver according to spark document. My question is: how to cache table with specified storage level, such as OFF_HEAP to me? I have dug into spark document and spark-user mail list, and did not get any idea. If I run `cache table TABLENAME` in beeline prompt line, I find this on monitor UI. I think rdd is cached in default storage level(MEMORY_ONLY), that is not what I want. Thanks 2C04F90E@476EDA34.00DC1055 Description: Binary data
Re: HiveContext: cache table not supported for partitioned table?
Thanks for your explanation. From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Thursday, October 2, 2014 at 8:01 PM To: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: HiveContext: cache table not supported for partitioned table? Cache table works with partitioned table. I guess you’re experimenting with a default local metastore and the metastore_db directory doesn’t exist at the first place. In this case, all metastore tables/views don’t exist at first and will throw the error message you saw when the PARTITIONS metastore table is accessed for the first time by Hive client. However, you should also see this line before this error: 14/10/03 10:51:30 ERROR ObjectStore: Direct SQL failed, falling back to ORM And then the table is created on the fly. The cache operation is also performed normally. You can verify this by selecting it and check the Spark UI for cached RDDs. If you try to uncache the table and cache it again, you won’t see this error any more. Normally, in production environment you won’t see this error because metastore database is usually setup ahead of time. On 10/3/14 3:39 AM, Du Li wrote: Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
HiveContext: cache table not supported for partitioned table?
Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
Re: HiveContext: cache table not supported for partitioned table?
Cache table works with partitioned table. I guess you’re experimenting with a default local metastore and the metastore_db directory doesn’t exist at the first place. In this case, all metastore tables/views don’t exist at first and will throw the error message you saw when the |PARTITIONS| metastore table is accessed for the first time by Hive client. However, you should also see this line before this error: 14/10/03 10:51:30 ERROR ObjectStore: Direct SQL failed, falling back to ORM And then the table is created on the fly. The cache operation is also performed normally. You can verify this by selecting it and check the Spark UI for cached RDDs. If you try to uncache the table and cache it again, you won’t see this error any more. Normally, in production environment you won’t see this error because metastore database is usually setup ahead of time. On 10/3/14 3:39 AM, Du Li wrote: Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
Re: Potential Thrift Server Bug on Spark SQL,perhaps with cache table?
Hi John, I tried to follow your description but failed to reproduce this issue. Would you mind to provide some more details? Especially: - Exact Git commit hash of the snapshot version you were using Mine: e0f946265b9ea5bc48849cf7794c2c03d5e29fba https://github.com/apache/spark/commit/e0f946265b9ea5bc48849cf7794c2c03d5e29fba - Compilation flags (Hadoop version, profiles enabled, etc.) Mine: ./sbt/sbt -Pyarn,kinesis-asl,hive,hadoop-2.3 -Dhadoop.version=2.3.0 clean assembly/assembly - Also, it would be great if you can provide the schema of your table plus some sample data that can help reproduce this issue. Cheng On Wed, Aug 20, 2014 at 6:11 AM, John Omernik j...@omernik.com wrote: I am working with Spark SQL and the Thrift server. I ran into an interesting bug, and I am curious on what information/testing I can provide to help narrow things down. My setup is as follows: Hive 0.12 with a table that has lots of columns (50+) stored as rcfile. Spark-1.1.0-SNAPSHOT with Hive Built in (and Thrift Server) My query is only selecting one STRING column from the data, but only returning data based on other columns . Types: col1 = STRING col2 = STRING col3 = STRING col4 = Partition Field (TYPE STRING) Queries cache table table1; --Run some other queries on other data select col1 from table1 where col2 = 'foo' and col3 = 'bar' and col4 = 'foobar' and col1 is not null limit 100 Fairly simple query. When I run this in SQL Squirrel I get no results. When I remove the and col1 is not null I get 100 rows of null When I run this in beeline (the one that is in the spark-1.1.0-SNAPSHOT) I get no results and when I remove 'and col1 is not null' I gett 100 rows of null Note: Both of these are after I ran some other queries.. .i.e. on other columns, after I ran CACHE TABLE TABLE1 first before any queries. That seemed interesting to me... So I went to the spark-shell to determine if it was a spark issue, or a thrift issue. I ran: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ cacheTable(table1) Then I ran the same other queries got results, and then I ran the query above, and I got results as expected. Interestingly enough, if I don't cache the table through cache table table1 in thrift, I get results for all queries. If I uncache, I start getting results again. I hope I was clear enough here, I am happy to help however I can. John
cache table with JDBC
I am using Spark's Thrift server to connect to Hive and use JDBC to issue queries. Is there a way to cache table in Sparck by using JDBC call? Thanks, Ken -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cache-table-with-JDBC-tp12675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Potential Thrift Server Bug on Spark SQL,perhaps with cache table?
I am working with Spark SQL and the Thrift server. I ran into an interesting bug, and I am curious on what information/testing I can provide to help narrow things down. My setup is as follows: Hive 0.12 with a table that has lots of columns (50+) stored as rcfile. Spark-1.1.0-SNAPSHOT with Hive Built in (and Thrift Server) My query is only selecting one STRING column from the data, but only returning data based on other columns . Types: col1 = STRING col2 = STRING col3 = STRING col4 = Partition Field (TYPE STRING) Queries cache table table1; --Run some other queries on other data select col1 from table1 where col2 = 'foo' and col3 = 'bar' and col4 = 'foobar' and col1 is not null limit 100 Fairly simple query. When I run this in SQL Squirrel I get no results. When I remove the and col1 is not null I get 100 rows of null When I run this in beeline (the one that is in the spark-1.1.0-SNAPSHOT) I get no results and when I remove 'and col1 is not null' I gett 100 rows of null Note: Both of these are after I ran some other queries.. .i.e. on other columns, after I ran CACHE TABLE TABLE1 first before any queries. That seemed interesting to me... So I went to the spark-shell to determine if it was a spark issue, or a thrift issue. I ran: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ cacheTable(table1) Then I ran the same other queries got results, and then I ran the query above, and I got results as expected. Interestingly enough, if I don't cache the table through cache table table1 in thrift, I get results for all queries. If I uncache, I start getting results again. I hope I was clear enough here, I am happy to help however I can. John