Re: Upsert for hive tables

2019-06-04 Thread tkrol
Hi Magnus,

Yes, I was thinking also about partitioning approach. And I think this is
the best solution in this type of scenario. 

Also my scenario is relevant to your last paragraph, the dates which are
coming are very random. I can get updated from 2012 and from 2019.
Therefore, this strategy might not be the best. Because when I do join on
let's say month = month AND year = year. Then I think I might not get much
performance gain. But I will try this approach.

If that won't be working, then I will try to play with different
partitioning schemes. 

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Upsert for hive tables

2019-05-30 Thread Magnus Nilsson
Since parquet don't support updates you have to backfill your dataset. If
that is your regular scenario you should partition your parquet files so
backfilling becomes easier.

As the data is structured now you have to update everything just to upsert
quite a small amount of changed data. Look at your data, look at your use
case and use partitioning (and bucketing if you want to eliminate/reduce
shuffle joins) to store your data in a more optimal way.

Lets say your large table is a timeline of events stretching three years
back but your updated data is only from the last week or month. If you'd
partition by year/month/week/day you could just backfill the partitions
that was updated. Adapt the pattern to your particular scenario and data
size.

If everything is random and no sure way to decide what partition updated
will happen in you could just break down your dataset by key %
(suitable_partition_size/assumed_total_size_of_dataset). There are alot of
partitioning schemes but the point is you have to limit the amount of data
to read from disk, filter and write back to get better performance.

regards,

Magnus

On Wed, May 29, 2019 at 7:20 PM Tomasz Krol  wrote:

> Hey Guys,
>
> I am wondering what would be your approach to following scenario:
>
> I have two tables - one (Table A) is relatively small (e.g 50GB) and
> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>
>  I want to ADD all records from Table A to Table B which dont exist in
> Table B yet. I use only one field (e.g. key) to check existence for
> specific record.
>
> Then I want to UPDATE (by values from Table A) all records in Table B
> which also exist in Table A. To determine if specific record exist I use
> also the same "key" field.
>
> To achieve above I run following sql queries:
>
> 1. Find existing records and insert into temp table
>
> insert into temp_table select a.cols from Table A a left semi join Table B
> b on a.key = b.key
>
> 2. Find new records and insert them into temp table
>
> insert into temp_table select a.cols from Table A a left anti join Table B
> b on a.key = b.key
>
> 3. Find existing records in Table B which dont exist in   Table A
>
> insert into temp_table select b.cols from Table B b left anti join Table A
> a a.key = b. key
>
> In that way I built Table B updated with records from Table A.
> However, the problem here is the step 3, because I am inserting almost 3
> TB of data that takes obviously some time.
> I was trying different approaches but no luck.
>
> I am wondering whats your ideas how can we perform this scenario
> efficiently in Spark?
>
> Cheers
>
> Tom
> --
> Tomasz Krol
> patric...@gmail.com
>


Re: Upsert for hive tables

2019-05-30 Thread Tomasz Krol
Unfortunately, dont have timestamps in those tables:( Only key on which I
can check existence of specific record.

But even with the timestamp how would you make the update.? When I say
update I mean to overwrite existing record.

For example you have following in table A

key| field1 | field2
1 a   b

and in Table B
key| field1 | field2
1 c d


so after update I want to have in Table B

1 | a | b

Dont want to insert new row in this case, just overwrite the existing one.

Thanks



On Thu 30 May 2019 at 05:10, Aakash Basu  wrote:

> Don't you have a date/timestamp to handle updates? So, you're talking
> about CDC? If you've Datestamp you can check if that/those key(s) exists,
> if exists then check if timestamp matches, if that matches, then ignore, if
> that doesn't then update.
>
> On Thu 30 May, 2019, 7:11 AM Genieliu,  wrote:
>
>> Isn't step1 and step2 producing the copy of Table A?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
Tomasz Krol
patric...@gmail.com


Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Don't you have a date/timestamp to handle updates? So, you're talking about
CDC? If you've Datestamp you can check if that/those key(s) exists, if
exists then check if timestamp matches, if that matches, then ignore, if
that doesn't then update.

On Thu 30 May, 2019, 7:11 AM Genieliu,  wrote:

> Isn't step1 and step2 producing the copy of Table A?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Upsert for hive tables

2019-05-29 Thread Genieliu
Isn't step1 and step2 producing the copy of Table A?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Why don't you simply copy whole of delta data (Table A) into a stage table
(temp table in your case) and insert depending on a *WHERE NOT EXISTS* check
on primary key/composite key which already exists in the table B?

That's faster and does the reconciliation job smoothly enough.

Others, any better input?

On Wed 29 May, 2019, 10:50 PM Tomasz Krol,  wrote:

> Hey Guys,
>
> I am wondering what would be your approach to following scenario:
>
> I have two tables - one (Table A) is relatively small (e.g 50GB) and
> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>
>  I want to ADD all records from Table A to Table B which dont exist in
> Table B yet. I use only one field (e.g. key) to check existence for
> specific record.
>
> Then I want to UPDATE (by values from Table A) all records in Table B
> which also exist in Table A. To determine if specific record exist I use
> also the same "key" field.
>
> To achieve above I run following sql queries:
>
> 1. Find existing records and insert into temp table
>
> insert into temp_table select a.cols from Table A a left semi join Table B
> b on a.key = b.key
>
> 2. Find new records and insert them into temp table
>
> insert into temp_table select a.cols from Table A a left anti join Table B
> b on a.key = b.key
>
> 3. Find existing records in Table B which dont exist in   Table A
>
> insert into temp_table select b.cols from Table B b left anti join Table A
> a a.key = b. key
>
> In that way I built Table B updated with records from Table A.
> However, the problem here is the step 3, because I am inserting almost 3
> TB of data that takes obviously some time.
> I was trying different approaches but no luck.
>
> I am wondering whats your ideas how can we perform this scenario
> efficiently in Spark?
>
> Cheers
>
> Tom
> --
> Tomasz Krol
> patric...@gmail.com
>


Re: Upsert for hive tables

2019-05-29 Thread Tomasz Krol
Hey Aakash,

That will work for records which dont exist yet in the target table. What
about records which have to be updated ?

As I mentioned, I want to do an upsert. That means, I want to add not
existing records and update those which already exist.

Thanks

Tom

On Wed 29 May 2019 at 18:39, Aakash Basu  wrote:

> Why don't you simply copy whole of delta data (Table A) into a stage table
> (temp table in your case) and insert depending on a *WHERE NOT EXISTS* check
> on primary key/composite key which already exists in the table B?
>
> That's faster and does the reconciliation job smoothly enough.
>
> Others, any better input?
>
> On Wed 29 May, 2019, 10:50 PM Tomasz Krol,  wrote:
>
>> Hey Guys,
>>
>> I am wondering what would be your approach to following scenario:
>>
>> I have two tables - one (Table A) is relatively small (e.g 50GB) and
>> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>>
>>  I want to ADD all records from Table A to Table B which dont exist in
>> Table B yet. I use only one field (e.g. key) to check existence for
>> specific record.
>>
>> Then I want to UPDATE (by values from Table A) all records in Table B
>> which also exist in Table A. To determine if specific record exist I use
>> also the same "key" field.
>>
>> To achieve above I run following sql queries:
>>
>> 1. Find existing records and insert into temp table
>>
>> insert into temp_table select a.cols from Table A a left semi join Table
>> B b on a.key = b.key
>>
>> 2. Find new records and insert them into temp table
>>
>> insert into temp_table select a.cols from Table A a left anti join Table
>> B b on a.key = b.key
>>
>> 3. Find existing records in Table B which dont exist in   Table A
>>
>> insert into temp_table select b.cols from Table B b left anti join Table
>> A a a.key = b. key
>>
>> In that way I built Table B updated with records from Table A.
>> However, the problem here is the step 3, because I am inserting almost 3
>> TB of data that takes obviously some time.
>> I was trying different approaches but no luck.
>>
>> I am wondering whats your ideas how can we perform this scenario
>> efficiently in Spark?
>>
>> Cheers
>>
>> Tom
>> --
>> Tomasz Krol
>> patric...@gmail.com
>>
> --
Tomasz Krol
patric...@gmail.com


Upsert for hive tables

2019-05-29 Thread Tomasz Krol
Hey Guys,

I am wondering what would be your approach to following scenario:

I have two tables - one (Table A) is relatively small (e.g 50GB) and second
one (Table B) much bigger (e.g. 3TB). Both are parquet tables.

 I want to ADD all records from Table A to Table B which dont exist in
Table B yet. I use only one field (e.g. key) to check existence for
specific record.

Then I want to UPDATE (by values from Table A) all records in Table B which
also exist in Table A. To determine if specific record exist I use also the
same "key" field.

To achieve above I run following sql queries:

1. Find existing records and insert into temp table

insert into temp_table select a.cols from Table A a left semi join Table B
b on a.key = b.key

2. Find new records and insert them into temp table

insert into temp_table select a.cols from Table A a left anti join Table B
b on a.key = b.key

3. Find existing records in Table B which dont exist in   Table A

insert into temp_table select b.cols from Table B b left anti join Table A
a a.key = b. key

In that way I built Table B updated with records from Table A.
However, the problem here is the step 3, because I am inserting almost 3 TB
of data that takes obviously some time.
I was trying different approaches but no luck.

I am wondering whats your ideas how can we perform this scenario
efficiently in Spark?

Cheers

Tom
-- 
Tomasz Krol
patric...@gmail.com