How to use a customPartttioner hashed by userId inside saveAsTable using a
dataframe?

On Mon, Feb 15, 2016 at 11:24 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> How about saving the dataframe as a table partitioned by userId? My User
> records have userId, number of sessions, visit count etc as the columns and
> it should be partitioned by userId. I will need to join the userTable saved
> in the database as follows with an incoming session RDD. The session RDD
> would have a sessionId and  a sessionRecord which has the userId. So,
>  saving the user  data as a table using dataframes partitioned by userId
> and then joining it with session RDDs, needs to be done.  How can I join a
> dataframe saved in hdfs with an incoming RDD so that all the records are
> not read and only the records for which the join conditions are met are
> read?
>
> df.write.partitionBy('userId').saveAsTable(...)
>
>
> On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> It depends on how many columns you need from tables for your queries and
>> potential number of rows.
>>
>> From my experience I don't believe that registering a table as temporary
>> means it is going to cache whole 1 billion rows into memory. That does not
>> make sense (I stand corrected). Only a fraction of rows and columns will be
>> needed.
>>
>> It will be interesting to know how Catalyst is handling this. I suspect
>> it behaves much like any data cache in a relational database by having some
>> form of MRU-LRU chain where rows are read into memory from the blocks,
>> processed and discarded to make room for new ones. If the memory is not big
>> enough the operation is spilled to disk.
>>
>> I just did a test on three tables in Hive with Spark 15.2 using Data
>> Frames and tempTables
>>
>> The FACT table had 1 billion rows as follows:
>>
>>
>> +----------------------------------------------------------------------------+--+
>> | CREATE TABLE
>> `sales_staging`(                                              |
>> |   `prod_id`
>> bigint,                                                        |
>> |   `cust_id`
>> bigint,                                                        |
>> |   `time_id`
>> timestamp,                                                     |
>> |   `channel_id`
>> bigint,                                                     |
>> |   `promo_id`
>> bigint,                                                       |
>> |   `quantity_sold`
>> decimal(10,0),                                           |
>> |   `amount_sold`
>> decimal(10,0))                                             |
>> | ROW FORMAT
>> SERDE                                                           |
>> |
>> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'                     |
>> | STORED AS
>> INPUTFORMAT                                                      |
>> |
>> 'org.apache.hadoop.mapred.TextInputFormat'                               |
>> |
>> OUTPUTFORMAT                                                               |
>> |
>> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'             |
>> |
>> LOCATION                                                                   |
>> |
>> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging'  |
>> | TBLPROPERTIES
>> (                                                            |
>> |
>> 'COLUMN_STATS_ACCURATE'='true',                                          |
>> |
>> 'last_modified_by'='hduser',                                             |
>> |
>> 'last_modified_time'='1451305601',                                       |
>> |
>> 'numFiles'='4',                                                          |
>> |   'numRows'='1000000000',
>> |
>> |
>> 'rawDataSize'='46661545000',                                             |
>> |
>> 'totalSize'='47661545000',                                               |
>> |
>> 'transient_lastDdlTime'='1451767601')                                    |
>>
>>
>>
>> The other dimension tables were tiny. It took 13 minutes to get the first
>> 10 rows back but only requiring few columns of interest. So I don't think
>> it was loading 1 billion rows into memory from sales_staging table
>>
>> Started at
>>
>> [15/02/2016 17:47:28.28]
>>
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> timestamp, CHANNEL_ID: bigint]
>>
>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
>> string]
>>
>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>> CALENDAR_MONTH_DESC: string]
>>
>> sqltext: String = ""
>>
>> sqltext: String =
>>
>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>>
>> FROM
>>
>> (
>>
>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS
>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales
>>
>> FROM t_s, t_t, t_c
>>
>> WHERE t_s.TIME_ID = t_t.TIME_ID
>>
>> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>>
>> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ) rs
>>
>> LIMIT 10
>>
>>
>>
>> [1998-01,Direct Sales,1823005210]
>>
>> [1998-01,Internet,248172522]
>>
>> [1998-01,Partners,474646900]
>>
>> [1998-02,Direct Sales,1819659036]
>>
>> [1998-02,Internet,298586496]
>>
>> [1998-02,Partners,534103258]
>>
>> [1998-03,Direct Sales,1405805622]
>>
>> [1998-03,Internet,229163168]
>>
>> [1998-03,Partners,352277328]
>>
>> [1998-03,Tele Sales,59700082]
>>
>>  Finished at
>>
>> [15/02/2016 18:00:50.50]
>>
>>
>>
>> On 15/02/2016 17:27, swetha kasireddy wrote:
>>
>> OK. would it only query for the records that I want in hive as per filter
>> or just load the entire table? My user table will have millions of records
>> and I do not want to cause OOM errors by loading the entire table in memory.
>>
>> On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh <m...@peridale.co.uk>
>> wrote:
>>
>>> Also worthwhile using temporary tables for the joint query.
>>>
>>>
>>>
>>> I can join a Hive table with any other JDBC accessed table from any
>>> other databases with DF and temporary tables
>>>
>>>
>>>
>>> //
>>>
>>> //Get the FACT table from Hive
>>>
>>> //
>>>
>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>>> oraclehadoop.sales")
>>>
>>>
>>>
>>> //
>>>
>>> //Get the Dimension table from Oracle via JDBC
>>>
>>> //
>>>
>>> val c = HiveContext.load("jdbc",
>>>
>>> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>>>
>>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>>> FROM sh.channels)",
>>>
>>> "user" -> "sh",
>>>
>>> "password" -> "xxx"))
>>>
>>>
>>>
>>>
>>>
>>> s.registerTempTable("t_s")
>>>
>>> c.registerTempTable("t_c")
>>>
>>>
>>>
>>> And do the join
>>>
>>>
>>>
>>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>>>
>>> FROM
>>>
>>> (
>>>
>>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS
>>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales
>>>
>>> FROM t_s, t_t, t_c
>>>
>>> WHERE t_s.TIME_ID = t_t.TIME_ID
>>>
>>> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>>>
>>> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>>
>>> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>>
>>> ) rs
>>>
>>> LIMIT 1000
>>>
>>> """
>>>
>>> HiveContext.sql(sqltext).collect.foreach(println)
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> NOTE: The information in this email is proprietary and confidential.
>>> This message is for the designated recipient only, if you are not the
>>> intended recipient, you should destroy it immediately. Any information in
>>> this message shall not be understood as given or endorsed by Peridale
>>> Technology Ltd, its subsidiaries or their employees, unless expressly so
>>> stated. It is the responsibility of the recipient to ensure that this email
>>> is virus free, therefore neither Peridale Technology Ltd, its subsidiaries
>>> nor their employees accept any responsibility.
>>>
>>>
>>>
>>>
>>>
>>> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
>>> *Sent:* 15 February 2016 08:44
>>> *To:* SRK <swethakasire...@gmail.com>
>>> *Cc:* user <user@spark.apache.org>
>>> *Subject:* Re: How to join an RDD with a hive table?
>>>
>>>
>>>
>>> Have you tried creating a DataFrame from the RDD and join with DataFrame
>>> which corresponds to the hive table ?
>>>
>>>
>>>
>>> On Sun, Feb 14, 2016 at 9:53 PM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> How to join an RDD with a hive table and retrieve only the records that
>>> I am
>>> interested. Suppose, I have an RDD that has 1000 records and there is a
>>> Hive
>>> table with 100,000 records, I should be able to join the RDD with the
>>> hive
>>> table  by an Id and I should be able to load only those 1000 records from
>>> Hive table so that are no memory issues. Also, I was planning on storing
>>> the
>>> data in hive in the form of parquet files. Any help on this is greatly
>>> appreciated.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>
>>
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>

Reply via email to