Spark has its own efficient in memory columnar format. So it's not ORC.
It's just that the data has to be serialized and deserialized over the
network. And that is consuming time.

Regards
Sab
On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> *Hi,*
>
> *Tools*
>
> *Spark 1.5.2, Hadoop 2.6, Hive 2.0, Spark-Shell, Hive Database*
>
> *Objectives: Timing differences between running Spark using SQL and
> running Spark using functional programing (FP) (functional calls) on Hive
> tables*
>
> *Underlying tables: Three tables in Hive database using ORC format*
>
> The main differences in timings come from running the queries and fetching
> data. If you look the transformation part that is
>
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds
> compared to just under 4 minutes for functional programming
>
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4
> minutes.
>
> These are my assumptions.
>
>    1. Running SQL the full query is executed in Hive which means that
>    Hive can take advantage of ORC optimization/storage index etc?
>    2. Running FP requires that data is fetched from the underlying tables
>    in Hive and brought back to Spark cluster (standalone here) and the joins
>    etc are done there
>
> The next step for me would be to:
>
>    1. Look at the query plans in Spark
>    2. Run the same code on Hive alone and compare results
>
>
>
> Any other suggestions are welcome.
>
> *Standard SQL code*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("\ncreating data set at "); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT    t.calendar_month_desc
>         , c.channel_desc
>         , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string]
>
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)
>
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at [24/02/2016 09:01:31.31
>
> *Code using functional programming*
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s =
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("\ncreating data set at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> *Results*
>
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [24/02/2016 09:00:14.14]
>
>
>
> On 24/02/2016 06:27, Sabarish Sasidharan wrote:
>
> When using SQL your full query, including the joins, were executed in
> Hive(or RDBMS) and only the results were brought into the Spark cluster. In
> the FP case, the data for the 3 tables is first pulled into the Spark
> cluster and then the join is executed.
>
> Thus the time difference.
>
> It's not immediately obvious why the results are different.
>
> Regards
> Sab
>
>
>

Reply via email to