Re: How to join an RDD with a hive table?

2016-02-16 Thread swetha kasireddy
How to use a customPartttioner hashed by userId inside saveAsTable using a
dataframe?

On Mon, Feb 15, 2016 at 11:24 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> How about saving the dataframe as a table partitioned by userId? My User
> records have userId, number of sessions, visit count etc as the columns and
> it should be partitioned by userId. I will need to join the userTable saved
> in the database as follows with an incoming session RDD. The session RDD
> would have a sessionId and  a sessionRecord which has the userId. So,
>  saving the user  data as a table using dataframes partitioned by userId
> and then joining it with session RDDs, needs to be done.  How can I join a
> dataframe saved in hdfs with an incoming RDD so that all the records are
> not read and only the records for which the join conditions are met are
> read?
>
> df.write.partitionBy('userId').saveAsTable(...)
>
>
> On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> It depends on how many columns you need from tables for your queries and
>> potential number of rows.
>>
>> From my experience I don't believe that registering a table as temporary
>> means it is going to cache whole 1 billion rows into memory. That does not
>> make sense (I stand corrected). Only a fraction of rows and columns will be
>> needed.
>>
>> It will be interesting to know how Catalyst is handling this. I suspect
>> it behaves much like any data cache in a relational database by having some
>> form of MRU-LRU chain where rows are read into memory from the blocks,
>> processed and discarded to make room for new ones. If the memory is not big
>> enough the operation is spilled to disk.
>>
>> I just did a test on three tables in Hive with Spark 15.2 using Data
>> Frames and tempTables
>>
>> The FACT table had 1 billion rows as follows:
>>
>>
>> ++--+
>> | CREATE TABLE
>> `sales_staging`(  |
>> |   `prod_id`
>> bigint,|
>> |   `cust_id`
>> bigint,|
>> |   `time_id`
>> timestamp, |
>> |   `channel_id`
>> bigint, |
>> |   `promo_id`
>> bigint,   |
>> |   `quantity_sold`
>> decimal(10,0),   |
>> |   `amount_sold`
>> decimal(10,0)) |
>> | ROW FORMAT
>> SERDE   |
>> |
>> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |
>> | STORED AS
>> INPUTFORMAT  |
>> |
>> 'org.apache.hadoop.mapred.TextInputFormat'   |
>> |
>> OUTPUTFORMAT   |
>> |
>> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
>> |
>> LOCATION   |
>> |
>> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging'  |
>> | TBLPROPERTIES
>> (|
>> |
>> 'COLUMN_STATS_ACCURATE'='true',  |
>> |
>> 'last_modified_by'='hduser', |
>> |
>> 'last_modified_time'='1451305601',   |
>> |
>> 'numFiles'='4',  |
>> |   'numRows'='10',
>> |
>> |
>> 'rawDataSize'='46661545000', |
>> |
>> 'totalSize'='47661545000',   |
>> |
>> 'transient_lastDdlTime'='1451767601')|
>>
>>
>>
>> The other dimension tables were tiny. It took 13 minutes to get the first
>> 10 rows back but only requiring few columns of interest. So I don't think
>> it was loading 1 billion rows into memory from sales_staging table
>>
>> Started at
>>
>> [15/02/2016 17:47:28.28]
>>
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>>
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>>
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>>
>> sqltext: String = ""
>>
>> sqltext: String =
>>
>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>>
>> FROM
>>
>> (
>>
>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS
>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales
>>
>> FROM t_s, t_t, t_c
>>
>> WHERE t_s.TIME_ID = t_t.TIME_ID
>>
>> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>>
>> GROUP BY 

Re: How to join an RDD with a hive table?

2016-02-15 Thread swetha kasireddy
How about saving the dataframe as a table partitioned by userId? My User
records have userId, number of sessions, visit count etc as the columns and
it should be partitioned by userId. I will need to join the userTable saved
in the database as follows with an incoming session RDD. The session RDD
would have a sessionId and  a sessionRecord which has the userId. So,
 saving the user  data as a table using dataframes partitioned by userId
and then joining it with session RDDs, needs to be done.  How can I join a
dataframe saved in hdfs with an incoming RDD so that all the records are
not read and only the records for which the join conditions are met are
read?

df.write.partitionBy('userId').saveAsTable(...)


On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> It depends on how many columns you need from tables for your queries and
> potential number of rows.
>
> From my experience I don't believe that registering a table as temporary
> means it is going to cache whole 1 billion rows into memory. That does not
> make sense (I stand corrected). Only a fraction of rows and columns will be
> needed.
>
> It will be interesting to know how Catalyst is handling this. I suspect it
> behaves much like any data cache in a relational database by having some
> form of MRU-LRU chain where rows are read into memory from the blocks,
> processed and discarded to make room for new ones. If the memory is not big
> enough the operation is spilled to disk.
>
> I just did a test on three tables in Hive with Spark 15.2 using Data
> Frames and tempTables
>
> The FACT table had 1 billion rows as follows:
>
>
> ++--+
> | CREATE TABLE
> `sales_staging`(  |
> |   `prod_id`
> bigint,|
> |   `cust_id`
> bigint,|
> |   `time_id`
> timestamp, |
> |   `channel_id`
> bigint, |
> |   `promo_id`
> bigint,   |
> |   `quantity_sold`
> decimal(10,0),   |
> |   `amount_sold`
> decimal(10,0)) |
> | ROW FORMAT
> SERDE   |
> |
> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |
> | STORED AS
> INPUTFORMAT  |
> |
> 'org.apache.hadoop.mapred.TextInputFormat'   |
> |
> OUTPUTFORMAT   |
> |
> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
> |
> LOCATION   |
> |
> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging'  |
> | TBLPROPERTIES
> (|
> |
> 'COLUMN_STATS_ACCURATE'='true',  |
> |
> 'last_modified_by'='hduser', |
> |
> 'last_modified_time'='1451305601',   |
> |
> 'numFiles'='4',  |
> |   'numRows'='10',
> |
> |
> 'rawDataSize'='46661545000', |
> |
> 'totalSize'='47661545000',   |
> |
> 'transient_lastDdlTime'='1451767601')|
>
>
>
> The other dimension tables were tiny. It took 13 minutes to get the first
> 10 rows back but only requiring few columns of interest. So I don't think
> it was loading 1 billion rows into memory from sales_staging table
>
> Started at
>
> [15/02/2016 17:47:28.28]
>
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
>
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
>
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> sqltext: String = ""
>
> sqltext: String =
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 10
>
>
>
> [1998-01,Direct Sales,1823005210]
>
> [1998-01,Internet,248172522]
>
> [1998-01,Partners,474646900]
>
> [1998-02,Direct Sales,1819659036]
>
> [1998-02,Internet,298586496]
>
> 

Re: How to join an RDD with a hive table?

2016-02-15 Thread swetha kasireddy
OK. would it only query for the records that I want in hive as per filter
or just load the entire table? My user table will have millions of records
and I do not want to cause OOM errors by loading the entire table in memory.

On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh 
wrote:

> Also worthwhile using temporary tables for the joint query.
>
>
>
> I can join a Hive table with any other JDBC accessed table from any other
> databases with DF and temporary tables
>
>
>
> //
>
> //Get the FACT table from Hive
>
> //
>
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> oraclehadoop.sales")
>
>
>
> //
>
> //Get the Dimension table from Oracle via JDBC
>
> //
>
> val c = HiveContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
>
> "user" -> "sh",
>
> "password" -> "xxx"))
>
>
>
>
>
> s.registerTempTable("t_s")
>
> c.registerTempTable("t_c")
>
>
>
> And do the join
>
>
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 1000
>
> """
>
> HiveContext.sql(sqltext).collect.foreach(println)
>
>
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* 15 February 2016 08:44
> *To:* SRK 
> *Cc:* user 
> *Subject:* Re: How to join an RDD with a hive table?
>
>
>
> Have you tried creating a DataFrame from the RDD and join with DataFrame
> which corresponds to the hive table ?
>
>
>
> On Sun, Feb 14, 2016 at 9:53 PM, SRK  wrote:
>
> Hi,
>
> How to join an RDD with a hive table and retrieve only the records that I
> am
> interested. Suppose, I have an RDD that has 1000 records and there is a
> Hive
> table with 100,000 records, I should be able to join the RDD with the hive
> table  by an Id and I should be able to load only those 1000 records from
> Hive table so that are no memory issues. Also, I was planning on storing
> the
> data in hive in the form of parquet files. Any help on this is greatly
> appreciated.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


RE: How to join an RDD with a hive table?

2016-02-15 Thread Mich Talebzadeh
Also worthwhile using temporary tables for the joint query.

 

I can join a Hive table with any other JDBC accessed table from any other 
databases with DF and temporary tables 

 

//

//Get the FACT table from Hive

//

var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM 
oraclehadoop.sales")

 

//

//Get the Dimension table from Oracle via JDBC

//

val c = HiveContext.load("jdbc",

Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",

"dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM 
sh.channels)",

"user" -> "sh",

"password" -> "xxx"))

 

 

s.registerTempTable("t_s")

c.registerTempTable("t_c")

 

And do the join

 

SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)

FROM

(

SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel, 
SUM(t_s.AMOUNT_SOLD) AS TotalSales

FROM t_s, t_t, t_c

WHERE t_s.TIME_ID = t_t.TIME_ID

AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID

GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC

ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC

) rs

LIMIT 1000

"""

HiveContext.sql(sqltext).collect.foreach(println)

 

HTH

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: 15 February 2016 08:44
To: SRK 
Cc: user 
Subject: Re: How to join an RDD with a hive table?

 

Have you tried creating a DataFrame from the RDD and join with DataFrame which 
corresponds to the hive table ?

 

On Sun, Feb 14, 2016 at 9:53 PM, SRK  > wrote:

Hi,

How to join an RDD with a hive table and retrieve only the records that I am
interested. Suppose, I have an RDD that has 1000 records and there is a Hive
table with 100,000 records, I should be able to join the RDD with the hive
table  by an Id and I should be able to load only those 1000 records from
Hive table so that are no memory issues. Also, I was planning on storing the
data in hive in the form of parquet files. Any help on this is greatly
appreciated.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 
For additional commands, e-mail: user-h...@spark.apache.org 
 

 



Re: How to join an RDD with a hive table?

2016-02-15 Thread Ted Yu
Have you tried creating a DataFrame from the RDD and join with DataFrame
which corresponds to the hive table ?

On Sun, Feb 14, 2016 at 9:53 PM, SRK  wrote:

> Hi,
>
> How to join an RDD with a hive table and retrieve only the records that I
> am
> interested. Suppose, I have an RDD that has 1000 records and there is a
> Hive
> table with 100,000 records, I should be able to join the RDD with the hive
> table  by an Id and I should be able to load only those 1000 records from
> Hive table so that are no memory issues. Also, I was planning on storing
> the
> data in hive in the form of parquet files. Any help on this is greatly
> appreciated.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>