This is a classic case compared to hadoop vs DWH implmentation.

Source (Delta table): SRC. Target: TGT

Requirement: Pure Upsert, ie just keep the latest information for each key.

Options:

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
          B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
          B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Both options are costly. And essentially more effort can be introduced to
write complex manipulations by partitioning data based on key and read only
partitions which are "changed". 3 issues:
1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

This may be good when partition is based on keys and keys increase
monotonically. This adds maintenance of adding more partitions but do well
well to contain number of partitions read.

My advise: Give HBase a shot. It gives UPSERT out of box. If you want
history, just add timestamp in the key (in reverse). Computation engines
easily support HBase.

Best
Ayan

On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
> udf broadcasted to all the worker nodes. Which I think is an efficient move
> enough.
>
> Thanks,
>
> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Hey,
>>
>> the very first run :
>>
>> glossary :
>>
>> delta_df := current run / execution changes dataframe.
>>
>> def deduplicate :
>> apply windowing function and group by
>>
>> def partitionDataframe(delta_df) :
>> get unique keys of that data frame and then return an array of data
>> frames each containing just that very same key as the column.
>> this will give the above dataframe partitoned as say by date column or
>> gender column or age group column etc etc.
>>
>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>> deduplicating key column ]
>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>
>> subsequent runs :
>>
>> for each partition :
>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 1. load df from previous hdfs location of that partition
>> 2. filter the above df(p) where p is the partiton no. such that keys not
>> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
>> delta_df(p). done via a basic ! in UDF.
>> 3. delta_df.unionAll(filtered df above).
>> 4. persist the output of 3. as df.write.mode.format.
>>
>> Is this the right way of doing the upserts partiton wise?  all in all it
>> is taking 2 hours for inserting / upserting 5ooK records in parquet format
>> in some hdfs location where each location gets mapped to one partition.
>>
>> My spark conf specs are :
>>
>> yarn cluster mode. single node.
>> spark.executor.memory 8g
>> spark.rpc.netty.dispatcher.numThreads 2
>>
>> Thanks,
>> Sumit
>>
>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to