Re: Upsert for hive tables
Hi Magnus, Yes, I was thinking also about partitioning approach. And I think this is the best solution in this type of scenario. Also my scenario is relevant to your last paragraph, the dates which are coming are very random. I can get updated from 2012 and from 2019. Therefore, this strategy might not be the best. Because when I do join on let's say month = month AND year = year. Then I think I might not get much performance gain. But I will try this approach. If that won't be working, then I will try to play with different partitioning schemes. Thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Upsert for hive tables
Since parquet don't support updates you have to backfill your dataset. If that is your regular scenario you should partition your parquet files so backfilling becomes easier. As the data is structured now you have to update everything just to upsert quite a small amount of changed data. Look at your data, look at your use case and use partitioning (and bucketing if you want to eliminate/reduce shuffle joins) to store your data in a more optimal way. Lets say your large table is a timeline of events stretching three years back but your updated data is only from the last week or month. If you'd partition by year/month/week/day you could just backfill the partitions that was updated. Adapt the pattern to your particular scenario and data size. If everything is random and no sure way to decide what partition updated will happen in you could just break down your dataset by key % (suitable_partition_size/assumed_total_size_of_dataset). There are alot of partitioning schemes but the point is you have to limit the amount of data to read from disk, filter and write back to get better performance. regards, Magnus On Wed, May 29, 2019 at 7:20 PM Tomasz Krol wrote: > Hey Guys, > > I am wondering what would be your approach to following scenario: > > I have two tables - one (Table A) is relatively small (e.g 50GB) and > second one (Table B) much bigger (e.g. 3TB). Both are parquet tables. > > I want to ADD all records from Table A to Table B which dont exist in > Table B yet. I use only one field (e.g. key) to check existence for > specific record. > > Then I want to UPDATE (by values from Table A) all records in Table B > which also exist in Table A. To determine if specific record exist I use > also the same "key" field. > > To achieve above I run following sql queries: > > 1. Find existing records and insert into temp table > > insert into temp_table select a.cols from Table A a left semi join Table B > b on a.key = b.key > > 2. Find new records and insert them into temp table > > insert into temp_table select a.cols from Table A a left anti join Table B > b on a.key = b.key > > 3. Find existing records in Table B which dont exist in Table A > > insert into temp_table select b.cols from Table B b left anti join Table A > a a.key = b. key > > In that way I built Table B updated with records from Table A. > However, the problem here is the step 3, because I am inserting almost 3 > TB of data that takes obviously some time. > I was trying different approaches but no luck. > > I am wondering whats your ideas how can we perform this scenario > efficiently in Spark? > > Cheers > > Tom > -- > Tomasz Krol > patric...@gmail.com >
Re: Upsert for hive tables
Unfortunately, dont have timestamps in those tables:( Only key on which I can check existence of specific record. But even with the timestamp how would you make the update.? When I say update I mean to overwrite existing record. For example you have following in table A key| field1 | field2 1 a b and in Table B key| field1 | field2 1 c d so after update I want to have in Table B 1 | a | b Dont want to insert new row in this case, just overwrite the existing one. Thanks On Thu 30 May 2019 at 05:10, Aakash Basu wrote: > Don't you have a date/timestamp to handle updates? So, you're talking > about CDC? If you've Datestamp you can check if that/those key(s) exists, > if exists then check if timestamp matches, if that matches, then ignore, if > that doesn't then update. > > On Thu 30 May, 2019, 7:11 AM Genieliu, wrote: > >> Isn't step1 and step2 producing the copy of Table A? >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Tomasz Krol patric...@gmail.com
Re: Upsert for hive tables
Don't you have a date/timestamp to handle updates? So, you're talking about CDC? If you've Datestamp you can check if that/those key(s) exists, if exists then check if timestamp matches, if that matches, then ignore, if that doesn't then update. On Thu 30 May, 2019, 7:11 AM Genieliu, wrote: > Isn't step1 and step2 producing the copy of Table A? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Upsert for hive tables
Isn't step1 and step2 producing the copy of Table A? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Upsert for hive tables
Why don't you simply copy whole of delta data (Table A) into a stage table (temp table in your case) and insert depending on a *WHERE NOT EXISTS* check on primary key/composite key which already exists in the table B? That's faster and does the reconciliation job smoothly enough. Others, any better input? On Wed 29 May, 2019, 10:50 PM Tomasz Krol, wrote: > Hey Guys, > > I am wondering what would be your approach to following scenario: > > I have two tables - one (Table A) is relatively small (e.g 50GB) and > second one (Table B) much bigger (e.g. 3TB). Both are parquet tables. > > I want to ADD all records from Table A to Table B which dont exist in > Table B yet. I use only one field (e.g. key) to check existence for > specific record. > > Then I want to UPDATE (by values from Table A) all records in Table B > which also exist in Table A. To determine if specific record exist I use > also the same "key" field. > > To achieve above I run following sql queries: > > 1. Find existing records and insert into temp table > > insert into temp_table select a.cols from Table A a left semi join Table B > b on a.key = b.key > > 2. Find new records and insert them into temp table > > insert into temp_table select a.cols from Table A a left anti join Table B > b on a.key = b.key > > 3. Find existing records in Table B which dont exist in Table A > > insert into temp_table select b.cols from Table B b left anti join Table A > a a.key = b. key > > In that way I built Table B updated with records from Table A. > However, the problem here is the step 3, because I am inserting almost 3 > TB of data that takes obviously some time. > I was trying different approaches but no luck. > > I am wondering whats your ideas how can we perform this scenario > efficiently in Spark? > > Cheers > > Tom > -- > Tomasz Krol > patric...@gmail.com >
Re: Upsert for hive tables
Hey Aakash, That will work for records which dont exist yet in the target table. What about records which have to be updated ? As I mentioned, I want to do an upsert. That means, I want to add not existing records and update those which already exist. Thanks Tom On Wed 29 May 2019 at 18:39, Aakash Basu wrote: > Why don't you simply copy whole of delta data (Table A) into a stage table > (temp table in your case) and insert depending on a *WHERE NOT EXISTS* check > on primary key/composite key which already exists in the table B? > > That's faster and does the reconciliation job smoothly enough. > > Others, any better input? > > On Wed 29 May, 2019, 10:50 PM Tomasz Krol, wrote: > >> Hey Guys, >> >> I am wondering what would be your approach to following scenario: >> >> I have two tables - one (Table A) is relatively small (e.g 50GB) and >> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables. >> >> I want to ADD all records from Table A to Table B which dont exist in >> Table B yet. I use only one field (e.g. key) to check existence for >> specific record. >> >> Then I want to UPDATE (by values from Table A) all records in Table B >> which also exist in Table A. To determine if specific record exist I use >> also the same "key" field. >> >> To achieve above I run following sql queries: >> >> 1. Find existing records and insert into temp table >> >> insert into temp_table select a.cols from Table A a left semi join Table >> B b on a.key = b.key >> >> 2. Find new records and insert them into temp table >> >> insert into temp_table select a.cols from Table A a left anti join Table >> B b on a.key = b.key >> >> 3. Find existing records in Table B which dont exist in Table A >> >> insert into temp_table select b.cols from Table B b left anti join Table >> A a a.key = b. key >> >> In that way I built Table B updated with records from Table A. >> However, the problem here is the step 3, because I am inserting almost 3 >> TB of data that takes obviously some time. >> I was trying different approaches but no luck. >> >> I am wondering whats your ideas how can we perform this scenario >> efficiently in Spark? >> >> Cheers >> >> Tom >> -- >> Tomasz Krol >> patric...@gmail.com >> > -- Tomasz Krol patric...@gmail.com
Upsert for hive tables
Hey Guys, I am wondering what would be your approach to following scenario: I have two tables - one (Table A) is relatively small (e.g 50GB) and second one (Table B) much bigger (e.g. 3TB). Both are parquet tables. I want to ADD all records from Table A to Table B which dont exist in Table B yet. I use only one field (e.g. key) to check existence for specific record. Then I want to UPDATE (by values from Table A) all records in Table B which also exist in Table A. To determine if specific record exist I use also the same "key" field. To achieve above I run following sql queries: 1. Find existing records and insert into temp table insert into temp_table select a.cols from Table A a left semi join Table B b on a.key = b.key 2. Find new records and insert them into temp table insert into temp_table select a.cols from Table A a left anti join Table B b on a.key = b.key 3. Find existing records in Table B which dont exist in Table A insert into temp_table select b.cols from Table B b left anti join Table A a a.key = b. key In that way I built Table B updated with records from Table A. However, the problem here is the step 3, because I am inserting almost 3 TB of data that takes obviously some time. I was trying different approaches but no luck. I am wondering whats your ideas how can we perform this scenario efficiently in Spark? Cheers Tom -- Tomasz Krol patric...@gmail.com