Re: Using functional programming rather than SQL

2016-02-24 Thread Mich Talebzadeh
 

This is a point that I like to clarify please. 

These are my assumptions:. 

* Data resides in Hive tables in a Hive database
* Data has to be extracted from these tables. Tables are ORC so they
have ORC optimizations (Storage indexes, file, stride (64MB chunks of
data) , rowsets (in 10K rows) that contain min, max, sum for each column
at these three levels)
* HiveContext means use Hive internal optimization as well? Right
* Spark contacts Hive and instructs to get the data from Hive. Hive
doers all that
* Spark takes that data into memory space and does the queries

Does that make sense? 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets 
> translated into a catalyst plan that is executed in spark. the dataframe 
> operations (referred to by Mich as the FP results) also get translated into a 
> catalyst plan that is executed on the exact same spark platform. so unless 
> the SQL gets translated into a much better plan (perhaps thanks to some 
> pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:
> 
> i am still missing something. if it is executed in the source database, which 
> is hive in this case, then it does need hive, no? how can you execute in hive 
> without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan 
>  wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the 
> sql is executed in the source database (assuming datasource is Hive or some 
> RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" 
>  wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan 
>  wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or 
> RDBMS) and only the results were brought into the Spark cluster. In the FP 
> case, the data for the 3 tables is first pulled into the Spark cluster and 
> then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" 
>  wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = 
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 
> 

Re: Using functional programming rather than SQL

2016-02-24 Thread Mich Talebzadeh
 

Hi Koert, 

My bad. I used a smaller size "sales" table in SQL plan. Kindly see my
new figures. 

On 24/02/2016 20:05, Koert Kuipers wrote: 

> my assumption, which is apparently incorrect, was that the SQL gets 
> translated into a catalyst plan that is executed in spark. the dataframe 
> operations (referred to by Mich as the FP results) also get translated into a 
> catalyst plan that is executed on the exact same spark platform. so unless 
> the SQL gets translated into a much better plan (perhaps thanks to some 
> pushdown into ORC?), i dont see why it can be much faster.
> 
> On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:
> 
> i am still missing something. if it is executed in the source database, which 
> is hive in this case, then it does need hive, no? how can you execute in hive 
> without needing hive? 
> 
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan 
>  wrote:
> 
> I never said it needs one. All I said is that when calling context.sql() the 
> sql is executed in the source database (assuming datasource is Hive or some 
> RDBMS) 
> 
> Regards
> Sab 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
> 
> That is incorrect HiveContext does not need a hive instance to run. 
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" 
>  wrote:
> 
> Yes 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
> 
> are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?
> 
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan 
>  wrote:
> 
> When using SQL your full query, including the joins, were executed in Hive(or 
> RDBMS) and only the results were brought into the Spark cluster. In the FP 
> case, the data for the 3 tables is first pulled into the Spark cluster and 
> then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab 
> 
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" 
>  wrote:
> 
> Hi, 
> 
> First thanks everyone for their suggestions. Much appreciated. 
> 
> This was the original queries written in SQL and run against Spark-shell 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop") 
> 
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> The second queries were written in FP as much as I could as below 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query")
> val rs1 = 
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query")
> val rs2 
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> However The first query results are slightly different in SQL and FP (may be 
> the first query code in FP is not exactly correct?) and more importantly the 
> FP takes order of magnitude longer compared to SQL (8 minutes compared to 
> less than a minute). I am not surprised as I expected Functional Programming 
> has to flatten up all those method calls and convert them to 

Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
my assumption, which is apparently incorrect, was that the SQL gets
translated into a catalyst plan that is executed in spark. the dataframe
operations (referred to by Mich as the FP results) also get translated into
a catalyst plan that is executed on the exact same spark platform. so
unless the SQL gets translated into a much better plan (perhaps thanks to
some pushdown into ORC?), i dont see why it can be much faster.




On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers  wrote:

> i am still missing something. if it is executed in the source database,
> which is hive in this case, then it does need hive, no? how can you execute
> in hive without needing hive?
>
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I never said it needs one. All I said is that when calling context.sql()
>> the sql is executed in the source database (assuming datasource is Hive or
>> some RDBMS)
>>
>> Regards
>> Sab
>>
>> Regards
>> Sab
>> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>>
>>> That is incorrect HiveContext does not need a hive instance to run.
>>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 Yes

 Regards
 Sab
 On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on
> spark sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
>> In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against
>>> Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>> TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID
>>> FROM sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>> channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>> times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>

Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
i am still missing something. if it is executed in the source database,
which is hive in this case, then it does need hive, no? how can you execute
in hive without needing hive?

On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>>
 are you saying that HiveContext.sql(...) runs on hive, and not on
 spark sql?

 On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
> In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against
>> Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>> TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>> channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>> times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP
>> (may be the first query code in FP is not exactly correct?) and more
>> importantly the FP takes order of magnitude longer compared to SQL (8
>> minutes compared to less than a minute). I am not surprised as I expected
>> Functional Programming has to flatten up all those method calls and 
>> convert
>> them to SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: 

Re: Using functional programming rather than SQL

2016-02-24 Thread Mohannad Ali
My apologies I definitely misunderstood. You are 100% correct.
On Feb 24, 2016 19:25, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> I never said it needs one. All I said is that when calling context.sql()
> the sql is executed in the source database (assuming datasource is Hive or
> some RDBMS)
>
> Regards
> Sab
>
> Regards
> Sab
> On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:
>
>> That is incorrect HiveContext does not need a hive instance to run.
>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Yes
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>>
 are you saying that HiveContext.sql(...) runs on hive, and not on
 spark sql?

 On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
 sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
> In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against
>> Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>> TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>> channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>> times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP
>> (may be the first query code in FP is not exactly correct?) and more
>> importantly the FP takes order of magnitude longer compared to SQL (8
>> minutes compared to less than a minute). I am not surprised as I expected
>> Functional Programming has to flatten up all those method calls and 
>> convert
>> them to SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first 

Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
I never said it needs one. All I said is that when calling context.sql()
the sql is executed in the source database (assuming datasource is Hive or
some RDBMS)

Regards
Sab

Regards
Sab
On 24-Feb-2016 11:49 pm, "Mohannad Ali"  wrote:

> That is incorrect HiveContext does not need a hive instance to run.
> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Yes
>>
>> Regards
>> Sab
>> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>>
>>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>>> sql?
>>>
>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 When using SQL your full query, including the joins, were executed in
 Hive(or RDBMS) and only the results were brought into the Spark cluster. In
 the FP case, the data for the 3 tables is first pulled into the Spark
 cluster and then the join is executed.

 Thus the time difference.

 It's not immediately obvious why the results are different.

 Regards
 Sab
 On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
 mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against
> Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
> TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
> channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
> times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP
> (may be the first query code in FP is not exactly correct?) and more
> importantly the FP takes order of magnitude longer compared to SQL (8
> minutes compared to less than a minute). I am not surprised as I expected
> Functional Programming has to flatten up all those method calls and 
> convert
> them to SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> 

Re: Using functional programming rather than SQL

2016-02-24 Thread Mohannad Ali
That is incorrect HiveContext does not need a hive instance to run.
On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
sabarish.sasidha...@manthan.com> wrote:

> Yes
>
> Regards
> Sab
> On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:
>
>> are you saying that HiveContext.sql(...) runs on hive, and not on spark
>> sql?
>>
>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> When using SQL your full query, including the joins, were executed in
>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>> cluster and then the join is executed.
>>>
>>> Thus the time difference.
>>>
>>> It's not immediately obvious why the results are different.
>>>
>>> Regards
>>> Sab
>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>


 Hi,

 First thanks everyone for their suggestions. Much appreciated.

 This was the original queries written in SQL and run against Spark-shell

 val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 println ("\nStarted at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 HiveContext.sql("use oraclehadoop")

 val rs = HiveContext.sql(
 """
 SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
 TotalSales
 FROM smallsales s
 INNER JOIN times t
 ON s.time_id = t.time_id
 INNER JOIN channels c
 ON s.channel_id = c.channel_id
 GROUP BY t.calendar_month_desc, c.channel_desc
 """)
 rs.registerTempTable("tmp")
 println ("\nfirst query")
 HiveContext.sql("""
 SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
 from tmp
 ORDER BY MONTH, CHANNEL LIMIT 5
 """).collect.foreach(println)
 println ("\nsecond query")
 HiveContext.sql("""
 SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
 FROM tmp
 GROUP BY channel_desc
 order by SALES DESC LIMIT 5
 """).collect.foreach(println)
 println ("\nFinished at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 sys.exit

 The second queries were written in FP as much as I could as below

 val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 println ("\nStarted at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 HiveContext.sql("use oraclehadoop")
 var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
 sales")
 val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
 val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
 times")
 val rs =
 s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
 println ("\nfirst query")
 val rs1 =
 rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
 println ("\nsecond query")
 val rs2
 =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
 println ("\nFinished at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 sys.exit



 However The first query results are slightly different in SQL and FP
 (may be the first query code in FP is not exactly correct?) and more
 importantly the FP takes order of magnitude longer compared to SQL (8
 minutes compared to less than a minute). I am not surprised as I expected
 Functional Programming has to flatten up all those method calls and convert
 them to SQL?

 *The standard SQL results*



 Started at
 [23/02/2016 23:55:30.30]
 res1: org.apache.spark.sql.DataFrame = [result: string]
 rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
 channel_desc: string, TotalSales: decimal(20,0)]

 first query
 [1998-01,Direct Sales,9161730]
 [1998-01,Internet,1248581]
 [1998-01,Partners,2409776]
 [1998-02,Direct Sales,9161840]
 [1998-02,Internet,1533193]



 second query
 [Direct Sales,9161840]
 [Internet,3977374]
 [Partners,3976291]
 [Tele Sales,328760]

 Finished at
 [23/02/2016 23:56:11.11]

 *The FP results*

 Started at
 [23/02/2016 23:45:58.58]
 res1: org.apache.spark.sql.DataFrame = [result: string]
 s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
 TIME_ID: timestamp, CHANNEL_ID: bigint]
 c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
 string]

Re: Using functional programming rather than SQL

2016-02-24 Thread Sabarish Sasidharan
Yes

Regards
Sab
On 24-Feb-2016 9:15 pm, "Koert Kuipers"  wrote:

> are you saying that HiveContext.sql(...) runs on hive, and not on spark
> sql?
>
> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When using SQL your full query, including the joins, were executed in
>> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
>> the FP case, the data for the 3 tables is first pulled into the Spark
>> cluster and then the join is executed.
>>
>> Thus the time difference.
>>
>> It's not immediately obvious why the results are different.
>>
>> Regards
>> Sab
>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>>
>>>
>>> Hi,
>>>
>>> First thanks everyone for their suggestions. Much appreciated.
>>>
>>> This was the original queries written in SQL and run against Spark-shell
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>>
>>> val rs = HiveContext.sql(
>>> """
>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>> TotalSales
>>> FROM smallsales s
>>> INNER JOIN times t
>>> ON s.time_id = t.time_id
>>> INNER JOIN channels c
>>> ON s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> rs.registerTempTable("tmp")
>>> println ("\nfirst query")
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nsecond query")
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC LIMIT 5
>>> """).collect.foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>> The second queries were written in FP as much as I could as below
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> sales")
>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> println ("\nfirst query")
>>> val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>> println ("\nsecond query")
>>> val rs2
>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> sys.exit
>>>
>>>
>>>
>>> However The first query results are slightly different in SQL and FP
>>> (may be the first query code in FP is not exactly correct?) and more
>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>> minutes compared to less than a minute). I am not surprised as I expected
>>> Functional Programming has to flatten up all those method calls and convert
>>> them to SQL?
>>>
>>> *The standard SQL results*
>>>
>>>
>>>
>>> Started at
>>> [23/02/2016 23:55:30.30]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9161730]
>>> [1998-01,Internet,1248581]
>>> [1998-01,Partners,2409776]
>>> [1998-02,Direct Sales,9161840]
>>> [1998-02,Internet,1533193]
>>>
>>>
>>>
>>> second query
>>> [Direct Sales,9161840]
>>> [Internet,3977374]
>>> [Partners,3976291]
>>> [Tele Sales,328760]
>>>
>>> Finished at
>>> [23/02/2016 23:56:11.11]
>>>
>>> *The FP results*
>>>
>>> Started at
>>> [23/02/2016 23:45:58.58]
>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>>> string]
>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>> CALENDAR_MONTH_DESC: string]
>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>
>>> first query
>>> [1998-01,Direct Sales,9086830]
>>> [1998-01,Internet,1247641]

Re: Using functional programming rather than SQL

2016-02-24 Thread Koert Kuipers
are you saying that HiveContext.sql(...) runs on hive, and not on spark sql?

On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> Hi,
>>
>> First thanks everyone for their suggestions. Much appreciated.
>>
>> This was the original queries written in SQL and run against Spark-shell
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>>
>> val rs = HiveContext.sql(
>> """
>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>> TotalSales
>> FROM smallsales s
>> INNER JOIN times t
>> ON s.time_id = t.time_id
>> INNER JOIN channels c
>> ON s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> rs.registerTempTable("tmp")
>> println ("\nfirst query")
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL LIMIT 5
>> """).collect.foreach(println)
>> println ("\nsecond query")
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC LIMIT 5
>> """).collect.foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>> The second queries were written in FP as much as I could as below
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> HiveContext.sql("use oraclehadoop")
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> sales")
>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
>> val rs =
>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>> println ("\nfirst query")
>> val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>> println ("\nsecond query")
>> val rs2
>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>> println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit
>>
>>
>>
>> However The first query results are slightly different in SQL and FP (may
>> be the first query code in FP is not exactly correct?) and more importantly
>> the FP takes order of magnitude longer compared to SQL (8 minutes compared
>> to less than a minute). I am not surprised as I expected Functional
>> Programming has to flatten up all those method calls and convert them to
>> SQL?
>>
>> *The standard SQL results*
>>
>>
>>
>> Started at
>> [23/02/2016 23:55:30.30]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9161730]
>> [1998-01,Internet,1248581]
>> [1998-01,Partners,2409776]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>>
>>
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>>
>> Finished at
>> [23/02/2016 23:56:11.11]
>>
>> *The FP results*
>>
>> Started at
>> [23/02/2016 23:45:58.58]
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>> channel_desc: string, TotalSales: decimal(20,0)]
>>
>> first query
>> [1998-01,Direct Sales,9086830]
>> [1998-01,Internet,1247641]
>> [1998-01,Partners,2393567]
>> [1998-02,Direct Sales,9161840]
>> [1998-02,Internet,1533193]
>> rs1: Unit = ()
>>
>> second query
>> [Direct Sales,9161840]
>> [Internet,3977374]
>> [Partners,3976291]
>> [Tele Sales,328760]
>> rs2: 

Re: Using functional programming rather than SQL

2016-02-23 Thread Sabarish Sasidharan
When using SQL your full query, including the joins, were executed in
Hive(or RDBMS) and only the results were brought into the Spark cluster. In
the FP case, the data for the 3 tables is first pulled into the Spark
cluster and then the join is executed.

Thus the time difference.

It's not immediately obvious why the results are different.

Regards
Sab
On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in 

Re: Using functional programming rather than SQL

2016-02-23 Thread Koert Kuipers
​instead of:
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
you should be able to do something like:
val s = HiveContext.table("sales").select("AMOUNT_SOLD", "TIME_ID",
"CHANNEL_ID")

its not obvious to me why the dataframe (aka FP) version would be
significantly slower, they should translate into similar/same execution
plans. the explain method on DataFrame should show you the plans. ​



On Tue, Feb 23, 2016 at 7:09 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the 

Re: Using functional programming rather than SQL

2016-02-23 Thread Mich Talebzadeh
 

Hi, 

First thanks everyone for their suggestions. Much appreciated. 

This was the original queries written in SQL and run against Spark-shell


val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop") 

val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query")
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query")
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

The second queries were written in FP as much as I could as below 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
times")
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query")
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query")
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

However The first query results are slightly different in SQL and FP
(may be the first query code in FP is not exactly correct?) and more
importantly the FP takes order of magnitude longer compared to SQL (8
minutes compared to less than a minute). I am not surprised as I
expected Functional Programming has to flatten up all those method calls
and convert them to SQL? 

THE STANDARD SQL RESULTS 

Started at
[23/02/2016 23:55:30.30]
res1: org.apache.spark.sql.DataFrame = [result: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at
[23/02/2016 23:56:11.11] 

THE FP RESULTS 

Started at
[23/02/2016 23:45:58.58]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[23/02/2016 23:53:42.42] 

On 22/02/2016 23:16, Mich Talebzadeh wrote: 

> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using 
> SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and 
> dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, 
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS 

Re: Using functional programming rather than SQL

2016-02-22 Thread Michał Zieliński
Your SQL query will look something like that in DataFrames (but as Ted
said, check the docs to see the signatures).

smallsales
.join(times,"time_id")
.join(channels,"channel_id")
.groupBy("calendar_month_desc","channel_desc")
.agg(sum(col("amount_sold")).as("TotalSales"),
"calendar_month_desc","channel_desc")

On 23 February 2016 at 01:50, Ted Yu  wrote:

> Mich:
> Please refer to the following test suite for examples on various DataFrame
> operations:
>
> sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
>
> On Mon, Feb 22, 2016 at 4:39 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Thanks Dean.
>>
>> I gather if I wanted to get the whole thing through FP with little or no
>> use of SQL, then for the first instance as I get the data set from Hive
>> (i.e,
>>
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>>
>> I can even possibly use DF to do the above sql joins because what I am
>> doing above can also be done in DF without SQL use? Is that possible? Or I
>> have to use some form of SQL?
>>
>>
>>
>> The rest I can do simply using DFs.
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> On 23/02/2016 00:26, Dean Wampler wrote:
>>
>> Kevin gave you the answer you need, but I'd like to comment on your
>> subject line. SQL is a limited form of FP. Sure, there are no anonymous
>> functions and other limitations, but it's declarative, like good FP
>> programs should be, and it offers an important subset of the operators
>> ("combinators") you want.
>>
>> Also, on a practical note, use the DataFrame API whenever you can, rather
>> than dropping down to the RDD API, because the DataFrame API is far more
>> performant. It's a classic case where restricting your options enables more
>> aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
>> Summit East nicely made this point.
>> http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott > > wrote:
>>
>>> In your example, the *rs* instance should be a DataFrame object. In
>>> other words, the result of *HiveContext.sql* is a DataFrame that you
>>> can manipulate using *filter, map, *etc.
>>>
>>>
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>
>>>
>>> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>
 Hi,

 I have data stored in Hive tables that I want to do simple manipulation.

 Currently in Spark I perform the following with getting the result set
 using SQL from Hive tables, registering as a temporary table in Spark

 Now Ideally I can get the result set into a DF and work on DF to slice
 and dice the data using functional programming with filter, map. split etc.

 I wanted to get some ideas on how to go about it.

 thanks

 val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

 HiveContext.sql("use oraclehadoop")
 val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
 c.channel_desc, SUM(s.amount_sold) AS TotalSales
 FROM smallsales s, times t, channels c
 WHERE s.time_id = t.time_id
 AND   s.channel_id = c.channel_id
 GROUP BY t.calendar_month_desc, c.channel_desc
 """)
 *rs.registerTempTable("tmp")*


 HiveContext.sql("""
 SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
 from tmp
 ORDER BY MONTH, CHANNEL
 """).collect.foreach(println)
 HiveContext.sql("""
 SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
 FROM tmp
 GROUP BY channel_desc
 order by SALES DESC
 """).collect.foreach(println)


 --

 Dr Mich Talebzadeh

 LinkedIn  
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com

 NOTE: The information in this email is proprietary and confidential. This 
 message is for the designated recipient only, if you are not the intended 
 recipient, you should destroy it immediately. Any information in this 
 message shall not be understood as given or endorsed by Cloud Technology 
 Partners Ltd, its subsidiaries or their employees, unless expressly so 
 stated. It is the responsibility of the recipient to ensure that this 
 email is virus free, 

Re: Using functional programming rather than SQL

2016-02-22 Thread Ted Yu
Mich:
Please refer to the following test suite for examples on various DataFrame
operations:

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

On Mon, Feb 22, 2016 at 4:39 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Thanks Dean.
>
> I gather if I wanted to get the whole thing through FP with little or no
> use of SQL, then for the first instance as I get the data set from Hive
> (i.e,
>
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
>
> I can even possibly use DF to do the above sql joins because what I am
> doing above can also be done in DF without SQL use? Is that possible? Or I
> have to use some form of SQL?
>
>
>
> The rest I can do simply using DFs.
>
>
>
> Thanks
>
>
>
>
>
> On 23/02/2016 00:26, Dean Wampler wrote:
>
> Kevin gave you the answer you need, but I'd like to comment on your
> subject line. SQL is a limited form of FP. Sure, there are no anonymous
> functions and other limitations, but it's declarative, like good FP
> programs should be, and it offers an important subset of the operators
> ("combinators") you want.
>
> Also, on a practical note, use the DataFrame API whenever you can, rather
> than dropping down to the RDD API, because the DataFrame API is far more
> performant. It's a classic case where restricting your options enables more
> aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
> Summit East nicely made this point.
> http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott 
> wrote:
>
>> In your example, the *rs* instance should be a DataFrame object. In
>> other words, the result of *HiveContext.sql* is a DataFrame that you can
>> manipulate using *filter, map, *etc.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>
>>
>> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>
>>> Hi,
>>>
>>> I have data stored in Hive tables that I want to do simple manipulation.
>>>
>>> Currently in Spark I perform the following with getting the result set
>>> using SQL from Hive tables, registering as a temporary table in Spark
>>>
>>> Now Ideally I can get the result set into a DF and work on DF to slice
>>> and dice the data using functional programming with filter, map. split etc.
>>>
>>> I wanted to get some ideas on how to go about it.
>>>
>>> thanks
>>>
>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> HiveContext.sql("use oraclehadoop")
>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>> FROM smallsales s, times t, channels c
>>> WHERE s.time_id = t.time_id
>>> AND   s.channel_id = c.channel_id
>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>> """)
>>> *rs.registerTempTable("tmp")*
>>>
>>>
>>> HiveContext.sql("""
>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>>> from tmp
>>> ORDER BY MONTH, CHANNEL
>>> """).collect.foreach(println)
>>> HiveContext.sql("""
>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>> FROM tmp
>>> GROUP BY channel_desc
>>> order by SALES DESC
>>> """).collect.foreach(println)
>>>
>>>
>>> --
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> http://talebzadehmich.wordpress.com
>>>
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Cloud Technology 
>>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>>> stated. It is the responsibility of the recipient to ensure that this email 
>>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>>> subsidiaries nor their employees accept any responsibility.
>>>
>>>
>>>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in 

Re: Using functional programming rather than SQL

2016-02-22 Thread Mich Talebzadeh
 

Thanks Dean. 

I gather if I wanted to get the whole thing through FP with little or no
use of SQL, then for the first instance as I get the data set from Hive
(i.e, 

val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
c.channel_desc, SUM(s.amount_sold) AS TotalSales
FROM smallsales s, times t, channels c
WHERE s.time_id = t.time_id
AND s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""") 

I can even possibly use DF to do the above sql joins because what I am
doing above can also be done in DF without SQL use? Is that possible? Or
I have to use some form of SQL? 

The rest I can do simply using DFs. 

Thanks 

On 23/02/2016 00:26, Dean Wampler wrote: 

> Kevin gave you the answer you need, but I'd like to comment on your subject 
> line. SQL is a limited form of FP. Sure, there are no anonymous functions and 
> other limitations, but it's declarative, like good FP programs should be, and 
> it offers an important subset of the operators ("combinators") you want. 
> 
> Also, on a practical note, use the DataFrame API whenever you can, rather 
> than dropping down to the RDD API, because the DataFrame API is far more 
> performant. It's a classic case where restricting your options enables more 
> aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark 
> Summit East nicely made this point. 
> http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming
>  [3] 
> 
> dean 
> 
> Dean Wampler, Ph.D. 
> Author: Programming Scala, 2nd Edition [4] (O'Reilly)
> 
> Typesafe [5]
> @deanwampler [6] 
> http://polyglotprogramming.com [7] 
> 
> On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott  
> wrote:
> 
> In your example, the _rs_ instance should be a DataFrame object. In other 
> words, the result of _HiveContext.sql_ is a DataFrame that you can manipulate 
> using _filter, map, _etc. 
> 
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>  [8] 
> 
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh 
>  wrote:
> 
> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using 
> SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and 
> dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, 
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println) 
> 
> -- 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  [1]
> 
> http://talebzadehmich.wordpress.com [2]
> 
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Links:
--
[1]
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[2] http://talebzadehmich.wordpress.com
[3]

Re: Using functional programming rather than SQL

2016-02-22 Thread Koert Kuipers
however to really enjoy functional programming i assume you also want to
use lambda in your map and filter, which means you need to convert
DataFrame to Dataset, using df.as[SomeCaseClass]. Just be aware that its
somewhat early days for Dataset.

On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott 
wrote:

> In your example, the *rs* instance should be a DataFrame object. In other
> words, the result of *HiveContext.sql* is a DataFrame that you can
> manipulate using *filter, map, *etc.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
>
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>


Re: Using functional programming rather than SQL

2016-02-22 Thread Dean Wampler
Kevin gave you the answer you need, but I'd like to comment on your subject
line. SQL is a limited form of FP. Sure, there are no anonymous functions
and other limitations, but it's declarative, like good FP programs should
be, and it offers an important subset of the operators ("combinators") you
want.

Also, on a practical note, use the DataFrame API whenever you can, rather
than dropping down to the RDD API, because the DataFrame API is far more
performant. It's a classic case where restricting your options enables more
aggressive optimizations behind the scenes. Michal Armbrust's talk at Spark
Summit East nicely made this point.
http://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Feb 22, 2016 at 6:45 PM, Kevin Mellott 
wrote:

> In your example, the *rs* instance should be a DataFrame object. In other
> words, the result of *HiveContext.sql* is a DataFrame that you can
> manipulate using *filter, map, *etc.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>
>
> On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Hi,
>>
>> I have data stored in Hive tables that I want to do simple manipulation.
>>
>> Currently in Spark I perform the following with getting the result set
>> using SQL from Hive tables, registering as a temporary table in Spark
>>
>> Now Ideally I can get the result set into a DF and work on DF to slice
>> and dice the data using functional programming with filter, map. split etc.
>>
>> I wanted to get some ideas on how to go about it.
>>
>> thanks
>>
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> HiveContext.sql("use oraclehadoop")
>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
>> SUM(s.amount_sold) AS TotalSales
>> FROM smallsales s, times t, channels c
>> WHERE s.time_id = t.time_id
>> AND   s.channel_id = c.channel_id
>> GROUP BY t.calendar_month_desc, c.channel_desc
>> """)
>> *rs.registerTempTable("tmp")*
>>
>>
>> HiveContext.sql("""
>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
>> from tmp
>> ORDER BY MONTH, CHANNEL
>> """).collect.foreach(println)
>> HiveContext.sql("""
>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>> FROM tmp
>> GROUP BY channel_desc
>> order by SALES DESC
>> """).collect.foreach(println)
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>


Re: Using functional programming rather than SQL

2016-02-22 Thread Kevin Mellott
In your example, the *rs* instance should be a DataFrame object. In other
words, the result of *HiveContext.sql* is a DataFrame that you can
manipulate using *filter, map, *etc.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext


On Mon, Feb 22, 2016 at 5:16 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in Spark
>
> Now Ideally I can get the result set into a DF and work on DF to slice and
> dice the data using functional programming with filter, map. split etc.
>
> I wanted to get some ideas on how to go about it.
>
> thanks
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc,
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND   s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> *rs.registerTempTable("tmp")*
>
>
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL
> """).collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC
> """).collect.foreach(println)
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>