are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?

On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP (may
>> be the first query code in FP is not exactly correct?) and more importantly
>> the FP takes order of magnitude longer compared to SQL (8 minutes compared
>> to less than a minute). I am not surprised as I expected Functional
>> Programming has to flatten up all those method calls and convert them to
>> SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9161730]
>> [1998-01,Internet,1248581]
>> [1998-01,Partners,2409776]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>>
>>
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>>
>> Finished at
>> [23/02/2016 23:56:11.11]
>>
>> *The FP results*
>>
>> Started at
>> [23/02/2016 23:45:58.58]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9086830]
>> [1998-01,Internet,1247641]
>> [1998-01,Partners,2393567]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>> rs1: Unit = ()
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>> rs2: Unit = ()
>>
>> Finished at
>> [23/02/2016 23:53:42.42]
>>
>>
>>
>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>

Reply via email to