I have a situation where I trying to add only new rows to an existing data set 
that lives in S3 as gzipped parquet files, looping and appending for each hour 
of the day. First, I create a DF from the existing data, then I use a query to 
create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
        """
        WITH ids AS (
            SELECT DISTINCT
                source,
                source_id,
                target,
                target_id
            FROM new_data i
            LEFT ANTI JOIN existing_data im
            ON i.source = im.source
            AND i.source_id = im.source_id
            AND i.target = im.target
            AND i.target = im.target_id
        """
    )
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see 
many duplicates. Can someone help me with this and tell me what I am doing 
wrong?

Thanks,
Ben

Reply via email to