Thanks Sumit, please post back how your test with Hbase go.


On Fri, Jul 29, 2016 at 8:06 PM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Hey Ayan,
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>           B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>           B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Well how we approached this was to broadcast the primary keys, since they
> say is better because a smaller table (we make sure that our run frequency
> is shrunk enlarged based on traffic somehow) so much so that the
> cardinality | unique delta primary keys | is a small and broadcastable
> number indeed. Then what follows next is a filter function on each executor
> which has the keys to be upserted against , all with them(I believe in
> memory, broadcast writes the keys in executor memory isnt it ? ). As in,
> that was the only optimization I could think of. with option A, as well as
> B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?
>
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> 1. is beyond doubt true, because my any column key back in time/partition
> space may get updated in the next run. So is 2, as in we make the entire
> partition pass through the filter for only updating 1 or 2-3 affected keys.
>
> I do not think with the current use case if I can ensure that keys get
> partitioned well and delta corresponds to just one partition, that will
> happen if I only and only maintain the date-wise partitions and some
> concept of recency is observed. Let me see how HBase might efficiently
> tackle this classic upsert case.
>
> Thanks,
> Sumit
>
> On Fri, Jul 29, 2016 at 3:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> This is a classic case compared to hadoop vs DWH implmentation.
>>
>> Source (Delta table): SRC. Target: TGT
>>
>> Requirement: Pure Upsert, ie just keep the latest information for each
>> key.
>>
>> Options:
>>
>> A. Create a table TGT1 as (select key,info from delta UNION ALL select
>> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
>> TGT. Not in can be written other variations using Outer Join
>> B. Assuming SRC and TGT have a timestamp,
>>           B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
>> OVER PARTITION BY (Key order by timestamp desc)
>>           B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>>
>> Both options are costly. And essentially more effort can be introduced to
>> write complex manipulations by partitioning data based on key and read only
>> partitions which are "changed". 3 issues:
>> 1.  if updates are fairly spred across keys, the scheme does not give
>> much benefit as number of partition read ~= total number of partition.
>> 2.  This scheme often shows long tail problem (Think 1 key changed in a
>> partition).
>>
>> This may be good when partition is based on keys and keys increase
>> monotonically. This adds maintenance of adding more partitions but do well
>> well to contain number of partitions read.
>>
>> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
>> history, just add timestamp in the key (in reverse). Computation engines
>> easily support HBase.
>>
>> Best
>> Ayan
>>
>> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in>
>> wrote:
>>
>>> Just a note, I had the delta_df keys for the filter as in NOT
>>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>>> efficient move enough.
>>>
>>> Thanks,
>>>
>>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> the very first run :
>>>>
>>>> glossary :
>>>>
>>>> delta_df := current run / execution changes dataframe.
>>>>
>>>> def deduplicate :
>>>> apply windowing function and group by
>>>>
>>>> def partitionDataframe(delta_df) :
>>>> get unique keys of that data frame and then return an array of data
>>>> frames each containing just that very same key as the column.
>>>> this will give the above dataframe partitoned as say by date column or
>>>> gender column or age group column etc etc.
>>>>
>>>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>>>> deduplicating key column ]
>>>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>>>
>>>> subsequent runs :
>>>>
>>>> for each partition :
>>>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>>> 1. load df from previous hdfs location of that partition
>>>> 2. filter the above df(p) where p is the partiton no. such that keys
>>>> not present in delta_df(p) of current run. i.e get df(p)[primary column]
>>>> not in delta_df(p). done via a basic ! in UDF.
>>>> 3. delta_df.unionAll(filtered df above).
>>>> 4. persist the output of 3. as df.write.mode.format.
>>>>
>>>> Is this the right way of doing the upserts partiton wise?  all in all
>>>> it is taking 2 hours for inserting / upserting 5ooK records in parquet
>>>> format in some hdfs location where each location gets mapped to one
>>>> partition.
>>>>
>>>> My spark conf specs are :
>>>>
>>>> yarn cluster mode. single node.
>>>> spark.executor.memory 8g
>>>> spark.rpc.netty.dispatcher.numThreads 2
>>>>
>>>> Thanks,
>>>> Sumit
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to