Hey Ayan,

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
          B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
          B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Well how we approached this was to broadcast the primary keys, since they
say is better because a smaller table (we make sure that our run frequency
is shrunk enlarged based on traffic somehow) so much so that the
cardinality | unique delta primary keys | is a small and broadcastable
number indeed. Then what follows next is a filter function on each executor
which has the keys to be upserted against , all with them(I believe in
memory, broadcast writes the keys in executor memory isnt it ? ). As in,
that was the only optimization I could think of. with option A, as well as
B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?

1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

1. is beyond doubt true, because my any column key back in time/partition
space may get updated in the next run. So is 2, as in we make the entire
partition pass through the filter for only updating 1 or 2-3 affected keys.

I do not think with the current use case if I can ensure that keys get
partitioned well and delta corresponds to just one partition, that will
happen if I only and only maintain the date-wise partitions and some
concept of recency is observed. Let me see how HBase might efficiently
tackle this classic upsert case.

Thanks,
Sumit

On Fri, Jul 29, 2016 at 3:22 PM, ayan guha <guha.a...@gmail.com> wrote:

> This is a classic case compared to hadoop vs DWH implmentation.
>
> Source (Delta table): SRC. Target: TGT
>
> Requirement: Pure Upsert, ie just keep the latest information for each
> key.
>
> Options:
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>           B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>           B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Both options are costly. And essentially more effort can be introduced to
> write complex manipulations by partitioning data based on key and read only
> partitions which are "changed". 3 issues:
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> This may be good when partition is based on keys and keys increase
> monotonically. This adds maintenance of adding more partitions but do well
> well to contain number of partitions read.
>
> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
> history, just add timestamp in the key (in reverse). Computation engines
> easily support HBase.
>
> Best
> Ayan
>
> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Just a note, I had the delta_df keys for the filter as in NOT
>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>> efficient move enough.
>>
>> Thanks,
>>
>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
>> wrote:
>>
>>> Hey,
>>>
>>> the very first run :
>>>
>>> glossary :
>>>
>>> delta_df := current run / execution changes dataframe.
>>>
>>> def deduplicate :
>>> apply windowing function and group by
>>>
>>> def partitionDataframe(delta_df) :
>>> get unique keys of that data frame and then return an array of data
>>> frames each containing just that very same key as the column.
>>> this will give the above dataframe partitoned as say by date column or
>>> gender column or age group column etc etc.
>>>
>>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>>> deduplicating key column ]
>>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>>
>>> subsequent runs :
>>>
>>> for each partition :
>>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>> 1. load df from previous hdfs location of that partition
>>> 2. filter the above df(p) where p is the partiton no. such that keys not
>>> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
>>> delta_df(p). done via a basic ! in UDF.
>>> 3. delta_df.unionAll(filtered df above).
>>> 4. persist the output of 3. as df.write.mode.format.
>>>
>>> Is this the right way of doing the upserts partiton wise?  all in all it
>>> is taking 2 hours for inserting / upserting 5ooK records in parquet format
>>> in some hdfs location where each location gets mapped to one partition.
>>>
>>> My spark conf specs are :
>>>
>>> yarn cluster mode. single node.
>>> spark.executor.memory 8g
>>> spark.rpc.netty.dispatcher.numThreads 2
>>>
>>> Thanks,
>>> Sumit
>>>
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to