Thanks Sumit, please post back how your test with Hbase go.
On Fri, Jul 29, 2016 at 8:06 PM, Sumit Khanna <sumit.kha...@askme.in> wrote: > Hey Ayan, > > A. Create a table TGT1 as (select key,info from delta UNION ALL select > key,info from TGT where key not in (select key from SRC)). Rename TGT1 to > TGT. Not in can be written other variations using Outer Join > B. Assuming SRC and TGT have a timestamp, > B.1. Select latest records from UNION ALL(SRC,TGT) using RANK() > OVER PARTITION BY (Key order by timestamp desc) > B.2. Create TGT1 from B.1. Rename TGT1 to TGT2 > > Well how we approached this was to broadcast the primary keys, since they > say is better because a smaller table (we make sure that our run frequency > is shrunk enlarged based on traffic somehow) so much so that the > cardinality | unique delta primary keys | is a small and broadcastable > number indeed. Then what follows next is a filter function on each executor > which has the keys to be upserted against , all with them(I believe in > memory, broadcast writes the keys in executor memory isnt it ? ). As in, > that was the only optimization I could think of. with option A, as well as > B, there are likely to be huge shuffle costs (shuffleHashJoin)s right? > > 1. if updates are fairly spred across keys, the scheme does not give much > benefit as number of partition read ~= total number of partition. > 2. This scheme often shows long tail problem (Think 1 key changed in a > partition). > > 1. is beyond doubt true, because my any column key back in time/partition > space may get updated in the next run. So is 2, as in we make the entire > partition pass through the filter for only updating 1 or 2-3 affected keys. > > I do not think with the current use case if I can ensure that keys get > partitioned well and delta corresponds to just one partition, that will > happen if I only and only maintain the date-wise partitions and some > concept of recency is observed. Let me see how HBase might efficiently > tackle this classic upsert case. > > Thanks, > Sumit > > On Fri, Jul 29, 2016 at 3:22 PM, ayan guha <guha.a...@gmail.com> wrote: > >> This is a classic case compared to hadoop vs DWH implmentation. >> >> Source (Delta table): SRC. Target: TGT >> >> Requirement: Pure Upsert, ie just keep the latest information for each >> key. >> >> Options: >> >> A. Create a table TGT1 as (select key,info from delta UNION ALL select >> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to >> TGT. Not in can be written other variations using Outer Join >> B. Assuming SRC and TGT have a timestamp, >> B.1. Select latest records from UNION ALL(SRC,TGT) using RANK() >> OVER PARTITION BY (Key order by timestamp desc) >> B.2. Create TGT1 from B.1. Rename TGT1 to TGT2 >> >> Both options are costly. And essentially more effort can be introduced to >> write complex manipulations by partitioning data based on key and read only >> partitions which are "changed". 3 issues: >> 1. if updates are fairly spred across keys, the scheme does not give >> much benefit as number of partition read ~= total number of partition. >> 2. This scheme often shows long tail problem (Think 1 key changed in a >> partition). >> >> This may be good when partition is based on keys and keys increase >> monotonically. This adds maintenance of adding more partitions but do well >> well to contain number of partitions read. >> >> My advise: Give HBase a shot. It gives UPSERT out of box. If you want >> history, just add timestamp in the key (in reverse). Computation engines >> easily support HBase. >> >> Best >> Ayan >> >> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in> >> wrote: >> >>> Just a note, I had the delta_df keys for the filter as in NOT >>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an >>> efficient move enough. >>> >>> Thanks, >>> >>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in> >>> wrote: >>> >>>> Hey, >>>> >>>> the very first run : >>>> >>>> glossary : >>>> >>>> delta_df := current run / execution changes dataframe. >>>> >>>> def deduplicate : >>>> apply windowing function and group by >>>> >>>> def partitionDataframe(delta_df) : >>>> get unique keys of that data frame and then return an array of data >>>> frames each containing just that very same key as the column. >>>> this will give the above dataframe partitoned as say by date column or >>>> gender column or age group column etc etc. >>>> >>>> 0. deduplicate(delta_df : delta_df [ with all unique primary / >>>> deduplicating key column ] >>>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)] >>>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_ >>>> >>>> subsequent runs : >>>> >>>> for each partition : >>>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)] >>>> 1. load df from previous hdfs location of that partition >>>> 2. filter the above df(p) where p is the partiton no. such that keys >>>> not present in delta_df(p) of current run. i.e get df(p)[primary column] >>>> not in delta_df(p). done via a basic ! in UDF. >>>> 3. delta_df.unionAll(filtered df above). >>>> 4. persist the output of 3. as df.write.mode.format. >>>> >>>> Is this the right way of doing the upserts partiton wise? all in all >>>> it is taking 2 hours for inserting / upserting 5ooK records in parquet >>>> format in some hdfs location where each location gets mapped to one >>>> partition. >>>> >>>> My spark conf specs are : >>>> >>>> yarn cluster mode. single node. >>>> spark.executor.memory 8g >>>> spark.rpc.netty.dispatcher.numThreads 2 >>>> >>>> Thanks, >>>> Sumit >>>> >>>> >>>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha