HI,
TOOLS SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT The main differences in timings come from running the queries and fetching data. If you look the transformation part that is val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) Takes I second. On the other hand using SQL the query 1 takes 19 seconds compared to just under 4 minutes for functional programming The seconds query using SQL takes 28 seconds. Using FP it takes around 4 minutes. These are my assumptions. * Running SQL the full query is executed in Hive which means that Hive can take advantage of ORC optimization/storage index etc? * Running FP requires that data is fetched from the underlying tables in Hive and brought back to Spark cluster (standalone here) and the joins etc are done there The next step for me would be to: * Look at the query plans in Spark * Run the same code on Hive alone and compare results Any other suggestions are welcome. STANDARD SQL CODE val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql("use oraclehadoop") println ("ncreating data set at "); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val rs = HiveContext.sql( """ SELECT t.calendar_month_desc , c.channel_desc , SUM(s.amount_sold) AS TotalSales FROM smallsales s INNER JOIN times t ON s.time_id = t.time_id INNER JOIN channels c ON s.channel_id = c.channel_id GROUP BY t.calendar_month_desc, c.channel_desc """) rs.registerTempTable("tmp") println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql(""" SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales from tmp ORDER BY MONTH, CHANNEL LIMIT 5 """).collect.foreach(println) println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql(""" SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES FROM tmp GROUP BY channel_desc order by SALES DESC LIMIT 5 """).collect.foreach(println) println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit RESULTS Started at [24/02/2016 09:00:50.50] res1: org.apache.spark.sql.DataFrame = [result: string] creating data set at [24/02/2016 09:00:53.53] rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0) First query at [24/02/2016 09:00:54.54] [1998-01,Direct Sales,9161730] [1998-01,Internet,1248581] [1998-01,Partners,2409776] [1998-02,Direct Sales,9161840] [1998-02,Internet,1533193] second query at [24/02/2016 09:01:13.13] [Direct Sales,9161840] [Internet,3977374] [Partners,3976291] [Tele Sales,328760] Finished at [24/02/2016 09:01:31.31 CODE USING FUNCTIONAL PROGRAMMING val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("nStarted at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql("use oraclehadoop") var s = HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID") val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC") val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC") println ("ncreating data set at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val rs = s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) println ("nfirst query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val rs1 = rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) println ("nsecond query at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val rs2 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) println ("nFinished at"); HiveContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit RESULTS Started at [24/02/2016 08:52:27.27] res1: org.apache.spark.sql.DataFrame = [result: string] s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: timestamp, CHANNEL_ID: bigint] c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string] t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: string] creating data set at [24/02/2016 08:52:30.30] rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, channel_desc: string, TotalSales: decimal(20,0)] first query at [24/02/2016 08:52:31.31] [1998-01,Direct Sales,9086830] [1998-01,Internet,1247641] [1998-01,Partners,2393567] [1998-02,Direct Sales,9161840] [1998-02,Internet,1533193] rs1: Unit = () second query at [24/02/2016 08:56:17.17] [Direct Sales,9161840] [Internet,3977374] [Partners,3976291] [Tele Sales,328760] rs2: Unit = () Finished at [24/02/2016 09:00:14.14] On 24/02/2016 06:27, Sabarish Sasidharan wrote: > When using SQL your full query, including the joins, were executed in Hive(or > RDBMS) and only the results were brought into the Spark cluster. In the FP > case, the data for the 3 tables is first pulled into the Spark cluster and > then the join is executed. > > Thus the time difference. > > It's not immediately obvious why the results are different. > > Regards > Sab