Spark SQL takes unexpected time
Hello, I have written an Spark SQL application which reads data from HDFS and query on it. The data size is around 2GB (30 million records). The schema and query I am running is as below. The query takes around 05+ seconds to execute. I tried by adding rdd.persist(StorageLevel.MEMORY_AND_DISK) and rdd.cache() but in both the cases it takes extra time, even if I give the below query as second the data. (assuming Spark will cache it for first query). case class EventDataTbl(ID: String, ONum: String, RNum: String, Timestamp: String, Duration: String, Type: String, Source: String, OName: String, RName: String) sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT 10").collect().foreach(println) Can you let me know if I am missing anything ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL takes unexpected time
If you are running on Spark 1.1 or earlier you'll want to use rdd.registerTempTable() followed by sqlContext.cacheTable() and then query that table. rdd.cache() is not using the optimized in-memory format and thus puts a lot of pressure on the GC. This is fixed in Spark 1.2 and .cache() should do what you want. I'll also note that the caching in SQL will actually make things slower if the data does not fit in memory. So you should look in the storage tab of the Spark Web UI and make sure all the partitions are fitting. On Sun, Nov 2, 2014 at 8:47 PM, Shailesh Birari wrote: > Hello, > > I have written an Spark SQL application which reads data from HDFS and > query on it. > The data size is around 2GB (30 million records). The schema and query I am > running is as below. > The query takes around 05+ seconds to execute. > I tried by adding >rdd.persist(StorageLevel.MEMORY_AND_DISK) > and >rdd.cache() > but in both the cases it takes extra time, even if I give the below query > as > second the data. (assuming Spark will cache it for first query). > > case class EventDataTbl(ID: String, > ONum: String, > RNum: String, > Timestamp: String, > Duration: String, > Type: String, > Source: String, > OName: String, > RName: String) > > sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl > GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT > 10").collect().foreach(println) > > Can you let me know if I am missing anything ? > > Thanks, > Shailesh > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark SQL takes unexpected time
Yes, I am using Spark1.1.0 and have used rdd.registerTempTable(). I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more than earlier). I also tried by changing schema to use Long data type in some fields but seems conversion takes more time. Is there any way to specify index ? Though I checked and didn't found any, just want to confirm. For your reference here is the snippet of code. - case class EventDataTbl(EventUID: Long, ONum: Long, RNum: Long, Timestamp: java.sql.Timestamp, Duration: String, Type: String, Source: String, OName: String, RName: String) val format = new java.text.SimpleDateFormat("-MM-dd hh:mm:ss") val cedFileName = "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2" val cedRdd = sc.textFile(cedFileName).map(_.split(",", -1)).map(p => EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7), p(8))) cedRdd.registerTempTable("EventDataTbl") sqlCntxt.cacheTable("EventDataTbl") val t1 = System.nanoTime() println("\n\n10 Most frequent conversations between the Originators and Recipients\n") sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT 10").collect().foreach(println) val t2 = System.nanoTime() println("Time taken " + (t2-t1)/10.0 + " Seconds") - Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL takes unexpected time
Michael, I should probably look closer myself @ the design of 1.2 vs 1.1 but I've been curious why Spark's in-memory data uses the heap instead of putting it off heap? Was this the optimization that was done in 1.2 to alleviate GC? On Mon, Nov 3, 2014 at 8:52 PM, Shailesh Birari wrote: > Yes, I am using Spark1.1.0 and have used rdd.registerTempTable(). > I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more > than > earlier). > > I also tried by changing schema to use Long data type in some fields but > seems conversion takes more time. > Is there any way to specify index ? Though I checked and didn't found any, > just want to confirm. > > For your reference here is the snippet of code. > > > - > case class EventDataTbl(EventUID: Long, > ONum: Long, > RNum: Long, > Timestamp: java.sql.Timestamp, > Duration: String, > Type: String, > Source: String, > OName: String, > RName: String) > > val format = new java.text.SimpleDateFormat("-MM-dd > hh:mm:ss") > val cedFileName = > "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2" > val cedRdd = sc.textFile(cedFileName).map(_.split(",", > -1)).map(p => > EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new > java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7), > p(8))) > > cedRdd.registerTempTable("EventDataTbl") > sqlCntxt.cacheTable("EventDataTbl") > > val t1 = System.nanoTime() > println("\n\n10 Most frequent conversations between the > Originators and > Recipients\n") > sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName > FROM EventDataTbl > GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT > 10").collect().foreach(println) > val t2 = System.nanoTime() > println("Time taken " + (t2-t1)/10.0 + " Seconds") > > > - > > Thanks, > Shailesh > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark SQL takes unexpected time
People also store data off-heap by putting parquet data into Tachyon. The optimization in 1.2 is to use the in-memory columnar cached format instead of keeping row objects (and their boxed contents) around when you call .cache(). This significantly reduces the number of live objects. (since you have a single byte buffer per column batch). On Tue, Nov 4, 2014 at 5:19 AM, Corey Nolet wrote: > Michael, > > I should probably look closer myself @ the design of 1.2 vs 1.1 but I've > been curious why Spark's in-memory data uses the heap instead of putting it > off heap? Was this the optimization that was done in 1.2 to alleviate GC? > > On Mon, Nov 3, 2014 at 8:52 PM, Shailesh Birari > wrote: > >> Yes, I am using Spark1.1.0 and have used rdd.registerTempTable(). >> I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more >> than >> earlier). >> >> I also tried by changing schema to use Long data type in some fields but >> seems conversion takes more time. >> Is there any way to specify index ? Though I checked and didn't found >> any, >> just want to confirm. >> >> For your reference here is the snippet of code. >> >> >> - >> case class EventDataTbl(EventUID: Long, >> ONum: Long, >> RNum: Long, >> Timestamp: java.sql.Timestamp, >> Duration: String, >> Type: String, >> Source: String, >> OName: String, >> RName: String) >> >> val format = new java.text.SimpleDateFormat("-MM-dd >> hh:mm:ss") >> val cedFileName = >> "hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2" >> val cedRdd = sc.textFile(cedFileName).map(_.split(",", >> -1)).map(p => >> EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new >> java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7), >> p(8))) >> >> cedRdd.registerTempTable("EventDataTbl") >> sqlCntxt.cacheTable("EventDataTbl") >> >> val t1 = System.nanoTime() >> println("\n\n10 Most frequent conversations between the >> Originators and >> Recipients\n") >> sql("SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName >> FROM EventDataTbl >> GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT >> 10").collect().foreach(println) >> val t2 = System.nanoTime() >> println("Time taken " + (t2-t1)/10.0 + " Seconds") >> >> >> - >> >> Thanks, >> Shailesh >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >