Yes Regards Sab On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
> are you saying that HiveContext.sql(...) runs on hive, and not on spark > sql? > > On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan < > sabarish.sasidha...@manthan.com> wrote: > >> When using SQL your full query, including the joins, were executed in >> Hive(or RDBMS) and only the results were brought into the Spark cluster. In >> the FP case, the data for the 3 tables is first pulled into the Spark >> cluster and then the join is executed. >> >> Thus the time difference. >> >> It's not immediately obvious why the results are different. >> >> Regards >> Sab >> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" < >> mich.talebza...@cloudtechnologypartners.co.uk> wrote: >> >>> >>> >>> Hi, >>> >>> First thanks everyone for their suggestions. Much appreciated. >>> >>> This was the original queries written in SQL and run against Spark-shell >>> >>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> println ("\nStarted at"); HiveContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> HiveContext.sql("use oraclehadoop") >>> >>> val rs = HiveContext.sql( >>> """ >>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS >>> TotalSales >>> FROM smallsales s >>> INNER JOIN times t >>> ON s.time_id = t.time_id >>> INNER JOIN channels c >>> ON s.channel_id = c.channel_id >>> GROUP BY t.calendar_month_desc, c.channel_desc >>> """) >>> rs.registerTempTable("tmp") >>> println ("\nfirst query") >>> HiveContext.sql(""" >>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales >>> from tmp >>> ORDER BY MONTH, CHANNEL LIMIT 5 >>> """).collect.foreach(println) >>> println ("\nsecond query") >>> HiveContext.sql(""" >>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>> FROM tmp >>> GROUP BY channel_desc >>> order by SALES DESC LIMIT 5 >>> """).collect.foreach(println) >>> println ("\nFinished at"); HiveContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> sys.exit >>> >>> The second queries were written in FP as much as I could as below >>> >>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> println ("\nStarted at"); HiveContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> HiveContext.sql("use oraclehadoop") >>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM >>> sales") >>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels") >>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times") >>> val rs = >>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) >>> println ("\nfirst query") >>> val rs1 = >>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) >>> println ("\nsecond query") >>> val rs2 >>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) >>> println ("\nFinished at"); HiveContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> sys.exit >>> >>> >>> >>> However The first query results are slightly different in SQL and FP >>> (may be the first query code in FP is not exactly correct?) and more >>> importantly the FP takes order of magnitude longer compared to SQL (8 >>> minutes compared to less than a minute). I am not surprised as I expected >>> Functional Programming has to flatten up all those method calls and convert >>> them to SQL? >>> >>> *The standard SQL results* >>> >>> >>> >>> Started at >>> [23/02/2016 23:55:30.30] >>> res1: org.apache.spark.sql.DataFrame = [result: string] >>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>> channel_desc: string, TotalSales: decimal(20,0)] >>> >>> first query >>> [1998-01,Direct Sales,9161730] >>> [1998-01,Internet,1248581] >>> [1998-01,Partners,2409776] >>> [1998-02,Direct Sales,9161840] >>> [1998-02,Internet,1533193] >>> >>> >>> >>> second query >>> [Direct Sales,9161840] >>> [Internet,3977374] >>> [Partners,3976291] >>> [Tele Sales,328760] >>> >>> Finished at >>> [23/02/2016 23:56:11.11] >>> >>> *The FP results* >>> >>> Started at >>> [23/02/2016 23:45:58.58] >>> res1: org.apache.spark.sql.DataFrame = [result: string] >>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), >>> TIME_ID: timestamp, CHANNEL_ID: bigint] >>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: >>> string] >>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, >>> CALENDAR_MONTH_DESC: string] >>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>> channel_desc: string, TotalSales: decimal(20,0)] >>> >>> first query >>> [1998-01,Direct Sales,9086830] >>> [1998-01,Internet,1247641] >>> [1998-01,Partners,2393567] >>> [1998-02,Direct Sales,9161840] >>> [1998-02,Internet,1533193] >>> rs1: Unit = () >>> >>> second query >>> [Direct Sales,9161840] >>> [Internet,3977374] >>> [Partners,3976291] >>> [Tele Sales,328760] >>> rs2: Unit = () >>> >>> Finished at >>> [23/02/2016 23:53:42.42] >>> >>> >>> >>> On 22/02/2016 23:16, Mich Talebzadeh wrote: >>> >>> Hi, >>> >>> I have data stored in Hive tables that I want to do simple manipulation. >>> >>> Currently in Spark I perform the following with getting the result set >>> using SQL from Hive tables, registering as a temporary table in Spark >>> >>> Now Ideally I can get the result set into a DF and work on DF to slice >>> and dice the data using functional programming with filter, map. split etc. >>> >>> I wanted to get some ideas on how to go about it. >>> >>> thanks >>> >>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> >>> HiveContext.sql("use oraclehadoop") >>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, >>> c.channel_desc, SUM(s.amount_sold) AS TotalSales >>> FROM smallsales s, times t, channels c >>> WHERE s.time_id = t.time_id >>> AND s.channel_id = c.channel_id >>> GROUP BY t.calendar_month_desc, c.channel_desc >>> """) >>> *rs.registerTempTable("tmp")* >>> >>> >>> HiveContext.sql(""" >>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales >>> from tmp >>> ORDER BY MONTH, CHANNEL >>> """).collect.foreach(println) >>> HiveContext.sql(""" >>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>> FROM tmp >>> GROUP BY channel_desc >>> order by SALES DESC >>> """).collect.foreach(println) >>> >>> >>> -- >>> >>> Dr Mich Talebzadeh >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> http://talebzadehmich.wordpress.com >>> >>> NOTE: The information in this email is proprietary and confidential. This >>> message is for the designated recipient only, if you are not the intended >>> recipient, you should destroy it immediately. Any information in this >>> message shall not be understood as given or endorsed by Cloud Technology >>> Partners Ltd, its subsidiaries or their employees, unless expressly so >>> stated. It is the responsibility of the recipient to ensure that this email >>> is virus free, therefore neither Cloud Technology partners Ltd, its >>> subsidiaries nor their employees accept any responsibility. >>> >>> >>> >>> >>> >>> -- >>> >>> Dr Mich Talebzadeh >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> http://talebzadehmich.wordpress.com >>> >>> NOTE: The information in this email is proprietary and confidential. This >>> message is for the designated recipient only, if you are not the intended >>> recipient, you should destroy it immediately. Any information in this >>> message shall not be understood as given or endorsed by Cloud Technology >>> Partners Ltd, its subsidiaries or their employees, unless expressly so >>> stated. It is the responsibility of the recipient to ensure that this email >>> is virus free, therefore neither Cloud Technology partners Ltd, its >>> subsidiaries nor their employees accept any responsibility. >>> >>> >>> >