Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION udf broadcasted to all the worker nodes. Which I think is an efficient move enough.
Thanks, On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in> wrote: > Hey, > > the very first run : > > glossary : > > delta_df := current run / execution changes dataframe. > > def deduplicate : > apply windowing function and group by > > def partitionDataframe(delta_df) : > get unique keys of that data frame and then return an array of data frames > each containing just that very same key as the column. > this will give the above dataframe partitoned as say by date column or > gender column or age group column etc etc. > > 0. deduplicate(delta_df : delta_df [ with all unique primary / > deduplicating key column ] > 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)] > 2. write the dataframe to corresponding parent hdfs path + partiton dir_ > > subsequent runs : > > for each partition : > 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)] > 1. load df from previous hdfs location of that partition > 2. filter the above df(p) where p is the partiton no. such that keys not > present in delta_df(p) of current run. i.e get df(p)[primary column] not in > delta_df(p). done via a basic ! in UDF. > 3. delta_df.unionAll(filtered df above). > 4. persist the output of 3. as df.write.mode.format. > > Is this the right way of doing the upserts partiton wise? all in all it > is taking 2 hours for inserting / upserting 5ooK records in parquet format > in some hdfs location where each location gets mapped to one partition. > > My spark conf specs are : > > yarn cluster mode. single node. > spark.executor.memory 8g > spark.rpc.netty.dispatcher.numThreads 2 > > Thanks, > Sumit > > >