I tried a different tactic. I still append based on the query below, but I add another deduping step afterwards, writing to a staging directory then overwriting back. Luckily, the data is small enough for this to happen fast.
Cheers, Ben > On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones <t.jonesd...@gmail.com> > wrote: > > Sorry actually my last message is not true for anti join, I was thinking of > semi join. > > -TJ > > On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones <t.jonesd...@gmail.com > <mailto:t.jonesd...@gmail.com>> wrote: > A left join with null filter is only the same as a left anti join if the join > keys can be guaranteed unique in the existing data. Since hive tables on s3 > offer no unique guarantees outside of your processing code, I recommend using > left anti join over left join + null filter. > > -TJ > > On Sun, Jun 3, 2018 at 14:47 ayan guha <guha.a...@gmail.com > <mailto:guha.a...@gmail.com>> wrote: > I do not use anti join semantics, but you can use left outer join and then > filter out nulls from right side. Your data may have dups on the columns > separately but it should not have dups on the composite key ie all columns > put together. > > On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <t.jonesd...@gmail.com > <mailto:t.jonesd...@gmail.com>> wrote: > The issue is not the append vs overwrite - perhaps those responders do not > know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 > eventual consistency issues. > > First, your sql query is wrong as you don’t close the parenthesis of the CTE > (“with” part). In fact, it looks like you don’t need that with at all, and > the query should fail to parse. If that does parse, I would open a bug on the > spark jira. > > Can you provide the query that you are using to detect duplication so I can > see if your deduplication logic matches the detection query? > > -TJ > > On Sat, Jun 2, 2018 at 10:22 Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > As Jay suggested correctly, if you're joining then overwrite otherwise only > append as it removes dups. > > I think, in this scenario, just change it to write.mode('overwrite') because > you're already reading the old data and your job would be done. > > > On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, <bbuil...@gmail.com > <mailto:bbuil...@gmail.com>> wrote: > Hi Jay, > > Thanks for your response. Are you saying to append the new data and then > remove the duplicates to the whole data set afterwards overwriting the > existing data set with new data set with appended values? I will give that a > try. > > Cheers, > Ben > > On Fri, Jun 1, 2018 at 11:49 PM Jay <jayadeep.jayara...@gmail.com > <mailto:jayadeep.jayara...@gmail.com>> wrote: > Benjamin, > > The append will append the "new" data to the existing data with removing the > duplicates. You would need to overwrite the file everytime if you need unique > values. > > Thanks, > Jayadeep > > On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim <bbuil...@gmail.com > <mailto:bbuil...@gmail.com>> wrote: > I have a situation where I trying to add only new rows to an existing data > set that lives in S3 as gzipped parquet files, looping and appending for each > hour of the day. First, I create a DF from the existing data, then I use a > query to create another DF with the data that is new. Here is the code > snippet. > > df = spark.read.parquet(existing_data_path) > df.createOrReplaceTempView(‘existing_data’) > new_df = spark.read.parquet(new_data_path) > new_df.createOrReplaceTempView(’new_data’) > append_df = spark.sql( > """ > WITH ids AS ( > SELECT DISTINCT > source, > source_id, > target, > target_id > FROM new_data i > LEFT ANTI JOIN existing_data im > ON i.source = im.source > AND i.source_id = im.source_id > AND i.target = im.target > AND i.target = im.target_id > """ > ) > append_df.coalesce(1).write.parquet(existing_data_path, mode='append', > compression='gzip’) > > I thought this would append new rows and keep the data unique, but I am see > many duplicates. Can someone help me with this and tell me what I am doing > wrong? > > Thanks, > Ben > -- > Best Regards, > Ayan Guha