I tried a different tactic. I still append based on the query below, but I add 
another deduping step afterwards, writing to a staging directory then 
overwriting back. Luckily, the data is small enough for this to happen fast.

Cheers,
Ben

> On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones <t.jonesd...@gmail.com> 
> wrote:
> 
> Sorry actually my last message is not true for anti join, I was thinking of 
> semi join. 
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones <t.jonesd...@gmail.com 
> <mailto:t.jonesd...@gmail.com>> wrote:
> A left join with null filter is only the same as a left anti join if the join 
> keys can be guaranteed unique in the existing data. Since hive tables on s3 
> offer no unique guarantees outside of your processing code, I recommend using 
> left anti join over left join + null filter.
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:47 ayan guha <guha.a...@gmail.com 
> <mailto:guha.a...@gmail.com>> wrote:
> I do not use anti join semantics, but you can use left outer join and then 
> filter out nulls from right side. Your data may have dups on the columns 
> separately but it should not have dups on the composite key ie all columns 
> put together.
> 
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <t.jonesd...@gmail.com 
> <mailto:t.jonesd...@gmail.com>> wrote:
> The issue is not the append vs overwrite - perhaps those responders do not 
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 
> eventual consistency issues. 
> 
> First, your sql query is wrong as you don’t close the parenthesis of the CTE 
> (“with” part). In fact, it looks like you don’t need that with at all, and 
> the query should fail to parse. If that does parse, I would open a bug on the 
> spark jira.
> 
> Can you provide the query that you are using to detect duplication so I can 
> see if your deduplication logic matches the detection query? 
> 
> -TJ
> 
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu <aakash.spark....@gmail.com 
> <mailto:aakash.spark....@gmail.com>> wrote:
> As Jay suggested correctly, if you're joining then overwrite otherwise only 
> append as it removes dups.
> 
> I think, in this scenario, just change it to write.mode('overwrite') because 
> you're already reading the old data and your job would be done.
> 
> 
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Jay,
> 
> Thanks for your response. Are you saying to append the new data and then 
> remove the duplicates to the whole data set afterwards overwriting the 
> existing data set with new data set with appended values? I will give that a 
> try. 
> 
> Cheers,
> Ben
> 
> On Fri, Jun 1, 2018 at 11:49 PM Jay <jayadeep.jayara...@gmail.com 
> <mailto:jayadeep.jayara...@gmail.com>> wrote:
> Benjamin,
> 
> The append will append the "new" data to the existing data with removing the 
> duplicates. You would need to overwrite the file everytime if you need unique 
> values.
> 
> Thanks,
> Jayadeep
> 
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I have a situation where I trying to add only new rows to an existing data 
> set that lives in S3 as gzipped parquet files, looping and appending for each 
> hour of the day. First, I create a DF from the existing data, then I use a 
> query to create another DF with the data that is new. Here is the code 
> snippet.
> 
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
>         """
>         WITH ids AS (
>             SELECT DISTINCT
>                 source,
>                 source_id,
>                 target,
>                 target_id
>             FROM new_data i
>             LEFT ANTI JOIN existing_data im
>             ON i.source = im.source
>             AND i.source_id = im.source_id
>             AND i.target = im.target
>             AND i.target = im.target_id
>         """
>     )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
> compression='gzip’)
> 
> I thought this would append new rows and keep the data unique, but I am see 
> many duplicates. Can someone help me with this and tell me what I am doing 
> wrong?
> 
> Thanks,
> Ben
> -- 
> Best Regards,
> Ayan Guha

Reply via email to