I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet.
df = spark.read.parquet(existing_data_path) df.createOrReplaceTempView(‘existing_data’) new_df = spark.read.parquet(new_data_path) new_df.createOrReplaceTempView(’new_data’) append_df = spark.sql( """ WITH ids AS ( SELECT DISTINCT source, source_id, target, target_id FROM new_data i LEFT ANTI JOIN existing_data im ON i.source = im.source AND i.source_id = im.source_id AND i.target = im.target AND i.target = im.target_id """ ) append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’) I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong? Thanks, Ben