How to use a customPartttioner hashed by userId inside saveAsTable using a dataframe?
On Mon, Feb 15, 2016 at 11:24 AM, swetha kasireddy < swethakasire...@gmail.com> wrote: > How about saving the dataframe as a table partitioned by userId? My User > records have userId, number of sessions, visit count etc as the columns and > it should be partitioned by userId. I will need to join the userTable saved > in the database as follows with an incoming session RDD. The session RDD > would have a sessionId and a sessionRecord which has the userId. So, > saving the user data as a table using dataframes partitioned by userId > and then joining it with session RDDs, needs to be done. How can I join a > dataframe saved in hdfs with an incoming RDD so that all the records are > not read and only the records for which the join conditions are met are > read? > > df.write.partitionBy('userId').saveAsTable(...) > > > On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh < > mich.talebza...@cloudtechnologypartners.co.uk> wrote: > >> >> >> It depends on how many columns you need from tables for your queries and >> potential number of rows. >> >> From my experience I don't believe that registering a table as temporary >> means it is going to cache whole 1 billion rows into memory. That does not >> make sense (I stand corrected). Only a fraction of rows and columns will be >> needed. >> >> It will be interesting to know how Catalyst is handling this. I suspect >> it behaves much like any data cache in a relational database by having some >> form of MRU-LRU chain where rows are read into memory from the blocks, >> processed and discarded to make room for new ones. If the memory is not big >> enough the operation is spilled to disk. >> >> I just did a test on three tables in Hive with Spark 15.2 using Data >> Frames and tempTables >> >> The FACT table had 1 billion rows as follows: >> >> >> +----------------------------------------------------------------------------+--+ >> | CREATE TABLE >> `sales_staging`( | >> | `prod_id` >> bigint, | >> | `cust_id` >> bigint, | >> | `time_id` >> timestamp, | >> | `channel_id` >> bigint, | >> | `promo_id` >> bigint, | >> | `quantity_sold` >> decimal(10,0), | >> | `amount_sold` >> decimal(10,0)) | >> | ROW FORMAT >> SERDE | >> | >> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | >> | STORED AS >> INPUTFORMAT | >> | >> 'org.apache.hadoop.mapred.TextInputFormat' | >> | >> OUTPUTFORMAT | >> | >> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | >> | >> LOCATION | >> | >> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging' | >> | TBLPROPERTIES >> ( | >> | >> 'COLUMN_STATS_ACCURATE'='true', | >> | >> 'last_modified_by'='hduser', | >> | >> 'last_modified_time'='1451305601', | >> | >> 'numFiles'='4', | >> | 'numRows'='1000000000', >> | >> | >> 'rawDataSize'='46661545000', | >> | >> 'totalSize'='47661545000', | >> | >> 'transient_lastDdlTime'='1451767601') | >> >> >> >> The other dimension tables were tiny. It took 13 minutes to get the first >> 10 rows back but only requiring few columns of interest. So I don't think >> it was loading 1 billion rows into memory from sales_staging table >> >> Started at >> >> [15/02/2016 17:47:28.28] >> >> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: >> timestamp, CHANNEL_ID: bigint] >> >> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: >> string] >> >> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, >> CALENDAR_MONTH_DESC: string] >> >> sqltext: String = "" >> >> sqltext: String = >> >> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2) >> >> FROM >> >> ( >> >> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS >> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales >> >> FROM t_s, t_t, t_c >> >> WHERE t_s.TIME_ID = t_t.TIME_ID >> >> AND t_s.CHANNEL_ID = t_c.CHANNEL_ID >> >> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC >> >> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC >> >> ) rs >> >> LIMIT 10 >> >> >> >> [1998-01,Direct Sales,1823005210] >> >> [1998-01,Internet,248172522] >> >> [1998-01,Partners,474646900] >> >> [1998-02,Direct Sales,1819659036] >> >> [1998-02,Internet,298586496] >> >> [1998-02,Partners,534103258] >> >> [1998-03,Direct Sales,1405805622] >> >> [1998-03,Internet,229163168] >> >> [1998-03,Partners,352277328] >> >> [1998-03,Tele Sales,59700082] >> >> Finished at >> >> [15/02/2016 18:00:50.50] >> >> >> >> On 15/02/2016 17:27, swetha kasireddy wrote: >> >> OK. would it only query for the records that I want in hive as per filter >> or just load the entire table? My user table will have millions of records >> and I do not want to cause OOM errors by loading the entire table in memory. >> >> On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh <m...@peridale.co.uk> >> wrote: >> >>> Also worthwhile using temporary tables for the joint query. >>> >>> >>> >>> I can join a Hive table with any other JDBC accessed table from any >>> other databases with DF and temporary tables >>> >>> >>> >>> // >>> >>> //Get the FACT table from Hive >>> >>> // >>> >>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM >>> oraclehadoop.sales") >>> >>> >>> >>> // >>> >>> //Get the Dimension table from Oracle via JDBC >>> >>> // >>> >>> val c = HiveContext.load("jdbc", >>> >>> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb", >>> >>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC >>> FROM sh.channels)", >>> >>> "user" -> "sh", >>> >>> "password" -> "xxx")) >>> >>> >>> >>> >>> >>> s.registerTempTable("t_s") >>> >>> c.registerTempTable("t_c") >>> >>> >>> >>> And do the join >>> >>> >>> >>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2) >>> >>> FROM >>> >>> ( >>> >>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS >>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales >>> >>> FROM t_s, t_t, t_c >>> >>> WHERE t_s.TIME_ID = t_t.TIME_ID >>> >>> AND t_s.CHANNEL_ID = t_c.CHANNEL_ID >>> >>> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC >>> >>> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC >>> >>> ) rs >>> >>> LIMIT 1000 >>> >>> """ >>> >>> HiveContext.sql(sqltext).collect.foreach(println) >>> >>> >>> >>> HTH >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> NOTE: The information in this email is proprietary and confidential. >>> This message is for the designated recipient only, if you are not the >>> intended recipient, you should destroy it immediately. Any information in >>> this message shall not be understood as given or endorsed by Peridale >>> Technology Ltd, its subsidiaries or their employees, unless expressly so >>> stated. It is the responsibility of the recipient to ensure that this email >>> is virus free, therefore neither Peridale Technology Ltd, its subsidiaries >>> nor their employees accept any responsibility. >>> >>> >>> >>> >>> >>> *From:* Ted Yu [mailto:yuzhih...@gmail.com] >>> *Sent:* 15 February 2016 08:44 >>> *To:* SRK <swethakasire...@gmail.com> >>> *Cc:* user <user@spark.apache.org> >>> *Subject:* Re: How to join an RDD with a hive table? >>> >>> >>> >>> Have you tried creating a DataFrame from the RDD and join with DataFrame >>> which corresponds to the hive table ? >>> >>> >>> >>> On Sun, Feb 14, 2016 at 9:53 PM, SRK <swethakasire...@gmail.com> wrote: >>> >>> Hi, >>> >>> How to join an RDD with a hive table and retrieve only the records that >>> I am >>> interested. Suppose, I have an RDD that has 1000 records and there is a >>> Hive >>> table with 100,000 records, I should be able to join the RDD with the >>> hive >>> table by an Id and I should be able to load only those 1000 records from >>> Hive table so that are no memory issues. Also, I was planning on storing >>> the >>> data in hive in the form of parquet files. Any help on this is greatly >>> appreciated. >>> >>> Thanks! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >> >> >> >> -- >> >> Dr Mich Talebzadeh >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> http://talebzadehmich.wordpress.com >> >> NOTE: The information in this email is proprietary and confidential. This >> message is for the designated recipient only, if you are not the intended >> recipient, you should destroy it immediately. Any information in this >> message shall not be understood as given or endorsed by Cloud Technology >> Partners Ltd, its subsidiaries or their employees, unless expressly so >> stated. It is the responsibility of the recipient to ensure that this email >> is virus free, therefore neither Cloud Technology partners Ltd, its >> subsidiaries nor their employees accept any responsibility. >> >> >> >