Hey,

the very first run :

glossary :

delta_df := current run / execution changes dataframe.

def deduplicate :
apply windowing function and group by

def partitionDataframe(delta_df) :
get unique keys of that data frame and then return an array of data frames
each containing just that very same key as the column.
this will give the above dataframe partitoned as say by date column or
gender column or age group column etc etc.

0. deduplicate(delta_df : delta_df [ with all unique primary  /
deduplicating key column ]
1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
2. write the dataframe to corresponding parent hdfs path + partiton dir_

subsequent runs :

for each partition :
0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
1. load df from previous hdfs location of that partition
2. filter the above df(p) where p is the partiton no. such that keys not
present in delta_df(p) of current run. i.e get df(p)[primary column] not in
delta_df(p). done via a basic ! in UDF.
3. delta_df.unionAll(filtered df above).
4. persist the output of 3. as df.write.mode.format.

Is this the right way of doing the upserts partiton wise?  all in all it is
taking 2 hours for inserting / upserting 5ooK records in parquet format in
some hdfs location where each location gets mapped to one partition.

My spark conf specs are :

yarn cluster mode. single node.
spark.executor.memory 8g
spark.rpc.netty.dispatcher.numThreads 2

Thanks,
Sumit

Reply via email to