Yes

Regards
Sab
On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on spark
> sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>
>>>
>>> However The first query results are slightly different in SQL and FP
>>> (may be the first query code in FP is not exactly correct?) and more
>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>> minutes compared to less than a minute). I am not surprised as I expected
>>> Functional Programming has to flatten up all those method calls and convert
>>> them to SQL?
>>>
>>> *The standard SQL results*
>>>
>>>
>>>
>>> Started at
>>> [23/02/2016 23:55:30.30]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9161730]
>>> [1998-01,Internet,1248581]
>>> [1998-01,Partners,2409776]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>>
>>>
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>>
>>> Finished at
>>> [23/02/2016 23:56:11.11]
>>>
>>> *The FP results*
>>>
>>> Started at
>>> [23/02/2016 23:45:58.58]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>> string]
>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>> CALENDAR_MONTH_DESC: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9086830]
>>> [1998-01,Internet,1247641]
>>> [1998-01,Partners,2393567]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>> rs1: Unit = ()
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>> rs2: Unit = ()
>>>
>>> Finished at
>>> [23/02/2016 23:53:42.42]
>>>
>>>
>>>
>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>
>>> Hi,
>>>
>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>
>>> Currently in Spark I perform the following with getting the result set
>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>
>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>> and dice the data using functional programming with filter, map. split etc.
>>>
>>> I wanted to get some ideas on how to go about it.
>>>
>>> thanks
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> HiveContext.sql("use oraclehadoop")
>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>> FROM smallsales s, times t, channels c
>>> WHERE s.time_id = t.time_id
>>> AND   s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> *rs.registerTempTable("tmp")*
>>>
>>>
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL
>>> """).collect.foreach(println)
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC
>>> """).collect.foreach(println)
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Cloud Technology 
>>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>>> stated. It is the responsibility of the recipient to ensure that this email 
>>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>>> subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Cloud Technology 
>>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>>> stated. It is the responsibility of the recipient to ensure that this email 
>>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>>> subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>

Reply via email to