Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
udf broadcasted to all the worker nodes. Which I think is an efficient move
enough.

Thanks,

On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
wrote:

> Hey,
>
> the very first run :
>
> glossary :
>
> delta_df := current run / execution changes dataframe.
>
> def deduplicate :
> apply windowing function and group by
>
> def partitionDataframe(delta_df) :
> get unique keys of that data frame and then return an array of data frames
> each containing just that very same key as the column.
> this will give the above dataframe partitoned as say by date column or
> gender column or age group column etc etc.
>
> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
> deduplicating key column ]
> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>
> subsequent runs :
>
> for each partition :
> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 1. load df from previous hdfs location of that partition
> 2. filter the above df(p) where p is the partiton no. such that keys not
> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
> delta_df(p). done via a basic ! in UDF.
> 3. delta_df.unionAll(filtered df above).
> 4. persist the output of 3. as df.write.mode.format.
>
> Is this the right way of doing the upserts partiton wise?  all in all it
> is taking 2 hours for inserting / upserting 5ooK records in parquet format
> in some hdfs location where each location gets mapped to one partition.
>
> My spark conf specs are :
>
> yarn cluster mode. single node.
> spark.executor.memory 8g
> spark.rpc.netty.dispatcher.numThreads 2
>
> Thanks,
> Sumit
>
>
>

Reply via email to