are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan < sabarish.sasidha...@manthan.com> wrote: > When using SQL your full query, including the joins, were executed in > Hive(or RDBMS) and only the results were brought into the Spark cluster. In > the FP case, the data for the 3 tables is first pulled into the Spark > cluster and then the join is executed. > > Thus the time difference. > > It's not immediately obvious why the results are different. > > Regards > Sab > On 24-Feb-2016 5:40 am, "Mich Talebzadeh" < > mich.talebza...@cloudtechnologypartners.co.uk> wrote: > >> >> >> Hi, >> >> First thanks everyone for their suggestions. Much appreciated. >> >> This was the original queries written in SQL and run against Spark-shell >> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> println ("\nStarted at"); HiveContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> HiveContext.sql("use oraclehadoop") >> >> val rs = HiveContext.sql( >> """ >> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS >> TotalSales >> FROM smallsales s >> INNER JOIN times t >> ON s.time_id = t.time_id >> INNER JOIN channels c >> ON s.channel_id = c.channel_id >> GROUP BY t.calendar_month_desc, c.channel_desc >> """) >> rs.registerTempTable("tmp") >> println ("\nfirst query") >> HiveContext.sql(""" >> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales >> from tmp >> ORDER BY MONTH, CHANNEL LIMIT 5 >> """).collect.foreach(println) >> println ("\nsecond query") >> HiveContext.sql(""" >> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >> FROM tmp >> GROUP BY channel_desc >> order by SALES DESC LIMIT 5 >> """).collect.foreach(println) >> println ("\nFinished at"); HiveContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> sys.exit >> >> The second queries were written in FP as much as I could as below >> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> println ("\nStarted at"); HiveContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> HiveContext.sql("use oraclehadoop") >> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM >> sales") >> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels") >> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times") >> val rs = >> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) >> println ("\nfirst query") >> val rs1 = >> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) >> println ("\nsecond query") >> val rs2 >> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) >> println ("\nFinished at"); HiveContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> sys.exit >> >> >> >> However The first query results are slightly different in SQL and FP (may >> be the first query code in FP is not exactly correct?) and more importantly >> the FP takes order of magnitude longer compared to SQL (8 minutes compared >> to less than a minute). I am not surprised as I expected Functional >> Programming has to flatten up all those method calls and convert them to >> SQL? >> >> *The standard SQL results* >> >> >> >> Started at >> [23/02/2016 23:55:30.30] >> res1: org.apache.spark.sql.DataFrame = [result: string] >> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >> channel_desc: string, TotalSales: decimal(20,0)] >> >> first query >> [1998-01,Direct Sales,9161730] >> [1998-01,Internet,1248581] >> [1998-01,Partners,2409776] >> [1998-02,Direct Sales,9161840] >> [1998-02,Internet,1533193] >> >> >> >> second query >> [Direct Sales,9161840] >> [Internet,3977374] >> [Partners,3976291] >> [Tele Sales,328760] >> >> Finished at >> [23/02/2016 23:56:11.11] >> >> *The FP results* >> >> Started at >> [23/02/2016 23:45:58.58] >> res1: org.apache.spark.sql.DataFrame = [result: string] >> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: >> timestamp, CHANNEL_ID: bigint] >> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: >> string] >> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, >> CALENDAR_MONTH_DESC: string] >> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >> channel_desc: string, TotalSales: decimal(20,0)] >> >> first query >> [1998-01,Direct Sales,9086830] >> [1998-01,Internet,1247641] >> [1998-01,Partners,2393567] >> [1998-02,Direct Sales,9161840] >> [1998-02,Internet,1533193] >> rs1: Unit = () >> >> second query >> [Direct Sales,9161840] >> [Internet,3977374] >> [Partners,3976291] >> [Tele Sales,328760] >> rs2: Unit = () >> >> Finished at >> [23/02/2016 23:53:42.42] >> >> >> >> On 22/02/2016 23:16, Mich Talebzadeh wrote: >> >> Hi, >> >> I have data stored in Hive tables that I want to do simple manipulation. >> >> Currently in Spark I perform the following with getting the result set >> using SQL from Hive tables, registering as a temporary table in Spark >> >> Now Ideally I can get the result set into a DF and work on DF to slice >> and dice the data using functional programming with filter, map. split etc. >> >> I wanted to get some ideas on how to go about it. >> >> thanks >> >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> HiveContext.sql("use oraclehadoop") >> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, >> SUM(s.amount_sold) AS TotalSales >> FROM smallsales s, times t, channels c >> WHERE s.time_id = t.time_id >> AND s.channel_id = c.channel_id >> GROUP BY t.calendar_month_desc, c.channel_desc >> """) >> *rs.registerTempTable("tmp")* >> >> >> HiveContext.sql(""" >> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales >> from tmp >> ORDER BY MONTH, CHANNEL >> """).collect.foreach(println) >> HiveContext.sql(""" >> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >> FROM tmp >> GROUP BY channel_desc >> order by SALES DESC >> """).collect.foreach(println) >> >> >> -- >> >> Dr Mich Talebzadeh >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> http://talebzadehmich.wordpress.com >> >> NOTE: The information in this email is proprietary and confidential. This >> message is for the designated recipient only, if you are not the intended >> recipient, you should destroy it immediately. Any information in this >> message shall not be understood as given or endorsed by Cloud Technology >> Partners Ltd, its subsidiaries or their employees, unless expressly so >> stated. It is the responsibility of the recipient to ensure that this email >> is virus free, therefore neither Cloud Technology partners Ltd, its >> subsidiaries nor their employees accept any responsibility. >> >> >> >> >> >> -- >> >> Dr Mich Talebzadeh >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> http://talebzadehmich.wordpress.com >> >> NOTE: The information in this email is proprietary and confidential. This >> message is for the designated recipient only, if you are not the intended >> recipient, you should destroy it immediately. Any information in this >> message shall not be understood as given or endorsed by Cloud Technology >> Partners Ltd, its subsidiaries or their employees, unless expressly so >> stated. It is the responsibility of the recipient to ensure that this email >> is virus free, therefore neither Cloud Technology partners Ltd, its >> subsidiaries nor their employees accept any responsibility. >> >> >>