Re: Append In-Place to S3

2018-06-07 Thread Benjamin Kim
I tried a different tactic. I still append based on the query below, but I add 
another deduping step afterwards, writing to a staging directory then 
overwriting back. Luckily, the data is small enough for this to happen fast.

Cheers,
Ben

> On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones  
> wrote:
> 
> Sorry actually my last message is not true for anti join, I was thinking of 
> semi join. 
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones  > wrote:
> A left join with null filter is only the same as a left anti join if the join 
> keys can be guaranteed unique in the existing data. Since hive tables on s3 
> offer no unique guarantees outside of your processing code, I recommend using 
> left anti join over left join + null filter.
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:47 ayan guha  > wrote:
> I do not use anti join semantics, but you can use left outer join and then 
> filter out nulls from right side. Your data may have dups on the columns 
> separately but it should not have dups on the composite key ie all columns 
> put together.
> 
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones  > wrote:
> The issue is not the append vs overwrite - perhaps those responders do not 
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 
> eventual consistency issues. 
> 
> First, your sql query is wrong as you don’t close the parenthesis of the CTE 
> (“with” part). In fact, it looks like you don’t need that with at all, and 
> the query should fail to parse. If that does parse, I would open a bug on the 
> spark jira.
> 
> Can you provide the query that you are using to detect duplication so I can 
> see if your deduplication logic matches the detection query? 
> 
> -TJ
> 
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu  > wrote:
> As Jay suggested correctly, if you're joining then overwrite otherwise only 
> append as it removes dups.
> 
> I think, in this scenario, just change it to write.mode('overwrite') because 
> you're already reading the old data and your job would be done.
> 
> 
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  > wrote:
> Hi Jay,
> 
> Thanks for your response. Are you saying to append the new data and then 
> remove the duplicates to the whole data set afterwards overwriting the 
> existing data set with new data set with appended values? I will give that a 
> try. 
> 
> Cheers,
> Ben
> 
> On Fri, Jun 1, 2018 at 11:49 PM Jay  > wrote:
> Benjamin,
> 
> The append will append the "new" data to the existing data with removing the 
> duplicates. You would need to overwrite the file everytime if you need unique 
> values.
> 
> Thanks,
> Jayadeep
> 
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  > wrote:
> I have a situation where I trying to add only new rows to an existing data 
> set that lives in S3 as gzipped parquet files, looping and appending for each 
> hour of the day. First, I create a DF from the existing data, then I use a 
> query to create another DF with the data that is new. Here is the code 
> snippet.
> 
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
> compression='gzip’)
> 
> I thought this would append new rows and keep the data unique, but I am see 
> many duplicates. Can someone help me with this and tell me what I am doing 
> wrong?
> 
> Thanks,
> Ben
> -- 
> Best Regards,
> Ayan Guha



Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
Sorry actually my last message is not true for anti join, I was thinking of
semi join.

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones 
wrote:

> A left join with null filter is only the same as a left anti join if the
> join keys can be guaranteed unique in the existing data. Since hive tables
> on s3 offer no unique guarantees outside of your processing code, I
> recommend using left anti join over left join + null filter.
>
> -TJ
>
> On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:
>
>> I do not use anti join semantics, but you can use left outer join and
>> then filter out nulls from right side. Your data may have dups on the
>> columns separately but it should not have dups on the composite key ie all
>> columns put together.
>>
>> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
>> t.jonesd...@gmail.com> wrote:
>>
>>> The issue is not the append vs overwrite - perhaps those responders do
>>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>>> to s3 eventual consistency issues.
>>>
>>> First, your sql query is wrong as you don’t close the parenthesis of the
>>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>>> and the query should fail to parse. If that does parse, I would open a bug
>>> on the spark jira.
>>>
>>> Can you provide the query that you are using to detect duplication so I
>>> can see if your deduplication logic matches the detection query?
>>>
>>> -TJ
>>>
>>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>>> wrote:
>>>
 As Jay suggested correctly, if you're joining then overwrite otherwise
 only append as it removes dups.

 I think, in this scenario, just change it to write.mode('overwrite')
 because you're already reading the old data and your job would be done.


 On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and
> then remove the duplicates to the whole data set afterwards overwriting 
> the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay 
> wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with
>> removing the duplicates. You would need to overwrite the file everytime 
>> if
>> you need unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
>> wrote:
>>
>>> I have a situation where I trying to add only new rows to an
>>> existing data set that lives in S3 as gzipped parquet files, looping and
>>> appending for each hour of the day. First, I create a DF from the 
>>> existing
>>> data, then I use a query to create another DF with the data that is new.
>>> Here is the code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path,
>>> mode='append', compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I
>>> am see many duplicates. Can someone help me with this and tell me what 
>>> I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
A left join with null filter is only the same as a left anti join if the
join keys can be guaranteed unique in the existing data. Since hive tables
on s3 offer no unique guarantees outside of your processing code, I
recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:

> I do not use anti join semantics, but you can use left outer join and then
> filter out nulls from right side. Your data may have dups on the columns
> separately but it should not have dups on the composite key ie all columns
> put together.
>
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
> t.jonesd...@gmail.com> wrote:
>
>> The issue is not the append vs overwrite - perhaps those responders do
>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>> to s3 eventual consistency issues.
>>
>> First, your sql query is wrong as you don’t close the parenthesis of the
>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>> and the query should fail to parse. If that does parse, I would open a bug
>> on the spark jira.
>>
>> Can you provide the query that you are using to detect duplication so I
>> can see if your deduplication logic matches the detection query?
>>
>> -TJ
>>
>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>> wrote:
>>
>>> As Jay suggested correctly, if you're joining then overwrite otherwise
>>> only append as it removes dups.
>>>
>>> I think, in this scenario, just change it to write.mode('overwrite')
>>> because you're already reading the old data and your job would be done.
>>>
>>>
>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>>
 Hi Jay,

 Thanks for your response. Are you saying to append the new data and
 then remove the duplicates to the whole data set afterwards overwriting the
 existing data set with new data set with appended values? I will give that
 a try.

 Cheers,
 Ben

 On Fri, Jun 1, 2018 at 11:49 PM Jay 
 wrote:

> Benjamin,
>
> The append will append the "new" data to the existing data with
> removing the duplicates. You would need to overwrite the file everytime if
> you need unique values.
>
> Thanks,
> Jayadeep
>
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
> wrote:
>
>> I have a situation where I trying to add only new rows to an existing
>> data set that lives in S3 as gzipped parquet files, looping and appending
>> for each hour of the day. First, I create a DF from the existing data, 
>> then
>> I use a query to create another DF with the data that is new. Here is the
>> code snippet.
>>
>> df = spark.read.parquet(existing_data_path)
>> df.createOrReplaceTempView(‘existing_data’)
>> new_df = spark.read.parquet(new_data_path)
>> new_df.createOrReplaceTempView(’new_data’)
>> append_df = spark.sql(
>> """
>> WITH ids AS (
>> SELECT DISTINCT
>> source,
>> source_id,
>> target,
>> target_id
>> FROM new_data i
>> LEFT ANTI JOIN existing_data im
>> ON i.source = im.source
>> AND i.source_id = im.source_id
>> AND i.target = im.target
>> AND i.target = im.target_id
>> """
>> )
>> append_df.coalesce(1).write.parquet(existing_data_path,
>> mode='append', compression='gzip’)
>>
>>
>> I thought this would append new rows and keep the data unique, but I
>> am see many duplicates. Can someone help me with this and tell me what I 
>> am
>> doing wrong?
>>
>> Thanks,
>> Ben
>>
> --
> Best Regards,
> Ayan Guha
>


Re: Append In-Place to S3

2018-06-03 Thread ayan guha
I do not use anti join semantics, but you can use left outer join and then
filter out nulls from right side. Your data may have dups on the columns
separately but it should not have dups on the composite key ie all columns
put together.

On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones 
wrote:

> The issue is not the append vs overwrite - perhaps those responders do not
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
> s3 eventual consistency issues.
>
> First, your sql query is wrong as you don’t close the parenthesis of the
> CTE (“with” part). In fact, it looks like you don’t need that with at all,
> and the query should fail to parse. If that does parse, I would open a bug
> on the spark jira.
>
> Can you provide the query that you are using to detect duplication so I
> can see if your deduplication logic matches the detection query?
>
> -TJ
>
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
> wrote:
>
>> As Jay suggested correctly, if you're joining then overwrite otherwise
>> only append as it removes dups.
>>
>> I think, in this scenario, just change it to write.mode('overwrite')
>> because you're already reading the old data and your job would be done.
>>
>>
>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>
>>> Hi Jay,
>>>
>>> Thanks for your response. Are you saying to append the new data and then
>>> remove the duplicates to the whole data set afterwards overwriting the
>>> existing data set with new data set with appended values? I will give that
>>> a try.
>>>
>>> Cheers,
>>> Ben
>>>
>>> On Fri, Jun 1, 2018 at 11:49 PM Jay 
>>> wrote:
>>>
 Benjamin,

 The append will append the "new" data to the existing data with
 removing the duplicates. You would need to overwrite the file everytime if
 you need unique values.

 Thanks,
 Jayadeep

 On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:

> I have a situation where I trying to add only new rows to an existing
> data set that lives in S3 as gzipped parquet files, looping and appending
> for each hour of the day. First, I create a DF from the existing data, 
> then
> I use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I
> am see many duplicates. Can someone help me with this and tell me what I 
> am
> doing wrong?
>
> Thanks,
> Ben
>
 --
Best Regards,
Ayan Guha


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
The issue is not the append vs overwrite - perhaps those responders do not
know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
s3 eventual consistency issues.

First, your sql query is wrong as you don’t close the parenthesis of the
CTE (“with” part). In fact, it looks like you don’t need that with at all,
and the query should fail to parse. If that does parse, I would open a bug
on the spark jira.

Can you provide the query that you are using to detect duplication so I can
see if your deduplication logic matches the detection query?

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu  wrote:

> As Jay suggested correctly, if you're joining then overwrite otherwise
> only append as it removes dups.
>
> I think, in this scenario, just change it to write.mode('overwrite')
> because you're already reading the old data and your job would be done.
>
>
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>
>> Hi Jay,
>>
>> Thanks for your response. Are you saying to append the new data and then
>> remove the duplicates to the whole data set afterwards overwriting the
>> existing data set with new data set with appended values? I will give that
>> a try.
>>
>> Cheers,
>> Ben
>>
>> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>>
>>> Benjamin,
>>>
>>> The append will append the "new" data to the existing data with removing
>>> the duplicates. You would need to overwrite the file everytime if you need
>>> unique values.
>>>
>>> Thanks,
>>> Jayadeep
>>>
>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>>
 I have a situation where I trying to add only new rows to an existing
 data set that lives in S3 as gzipped parquet files, looping and appending
 for each hour of the day. First, I create a DF from the existing data, then
 I use a query to create another DF with the data that is new. Here is the
 code snippet.

 df = spark.read.parquet(existing_data_path)
 df.createOrReplaceTempView(‘existing_data’)
 new_df = spark.read.parquet(new_data_path)
 new_df.createOrReplaceTempView(’new_data’)
 append_df = spark.sql(
 """
 WITH ids AS (
 SELECT DISTINCT
 source,
 source_id,
 target,
 target_id
 FROM new_data i
 LEFT ANTI JOIN existing_data im
 ON i.source = im.source
 AND i.source_id = im.source_id
 AND i.target = im.target
 AND i.target = im.target_id
 """
 )
 append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
 compression='gzip’)


 I thought this would append new rows and keep the data unique, but I am
 see many duplicates. Can someone help me with this and tell me what I am
 doing wrong?

 Thanks,
 Ben

>>>


Re: Append In-Place to S3

2018-06-02 Thread Aakash Basu
As Jay suggested correctly, if you're joining then overwrite otherwise only
append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite')
because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and then
> remove the duplicates to the whole data set afterwards overwriting the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with removing
>> the duplicates. You would need to overwrite the file everytime if you need
>> unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>
>>> I have a situation where I trying to add only new rows to an existing
>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>> for each hour of the day. First, I create a DF from the existing data, then
>>> I use a query to create another DF with the data that is new. Here is the
>>> code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>> compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I am
>>> see many duplicates. Can someone help me with this and tell me what I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>>


Re: Append In-Place to S3

2018-06-02 Thread Benjamin Kim
Hi Jay,

Thanks for your response. Are you saying to append the new data and then
remove the duplicates to the whole data set afterwards overwriting the
existing data set with new data set with appended values? I will give that
a try.

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:

> Benjamin,
>
> The append will append the "new" data to the existing data with removing
> the duplicates. You would need to overwrite the file everytime if you need
> unique values.
>
> Thanks,
> Jayadeep
>
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>
>> I have a situation where I trying to add only new rows to an existing
>> data set that lives in S3 as gzipped parquet files, looping and appending
>> for each hour of the day. First, I create a DF from the existing data, then
>> I use a query to create another DF with the data that is new. Here is the
>> code snippet.
>>
>> df = spark.read.parquet(existing_data_path)
>> df.createOrReplaceTempView(‘existing_data’)
>> new_df = spark.read.parquet(new_data_path)
>> new_df.createOrReplaceTempView(’new_data’)
>> append_df = spark.sql(
>> """
>> WITH ids AS (
>> SELECT DISTINCT
>> source,
>> source_id,
>> target,
>> target_id
>> FROM new_data i
>> LEFT ANTI JOIN existing_data im
>> ON i.source = im.source
>> AND i.source_id = im.source_id
>> AND i.target = im.target
>> AND i.target = im.target_id
>> """
>> )
>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>> compression='gzip’)
>>
>>
>> I thought this would append new rows and keep the data unique, but I am
>> see many duplicates. Can someone help me with this and tell me what I am
>> doing wrong?
>>
>> Thanks,
>> Ben
>>
>


Re: Append In-Place to S3

2018-06-02 Thread vincent gromakowski
Structured streaming can provide idempotent and exactly once writings in
parquet but I don't know how it does under the hood.
Without this you need to load all your dataset, then dedup, then write back
the entire dataset. This overhead can be minimized with partitionning
output files.

Le ven. 1 juin 2018, 18:01, Benjamin Kim  a écrit :

> I have a situation where I trying to add only new rows to an existing data
> set that lives in S3 as gzipped parquet files, looping and appending for
> each hour of the day. First, I create a DF from the existing data, then I
> use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I am
> see many duplicates. Can someone help me with this and tell me what I am
> doing wrong?
>
> Thanks,
> Ben
>


Re: Append In-Place to S3

2018-06-02 Thread Jay
Benjamin,

The append will append the "new" data to the existing data with removing
the duplicates. You would need to overwrite the file everytime if you need
unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:

> I have a situation where I trying to add only new rows to an existing data
> set that lives in S3 as gzipped parquet files, looping and appending for
> each hour of the day. First, I create a DF from the existing data, then I
> use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I am
> see many duplicates. Can someone help me with this and tell me what I am
> doing wrong?
>
> Thanks,
> Ben
>


Append In-Place to S3

2018-06-01 Thread Benjamin Kim
I have a situation where I trying to add only new rows to an existing data set 
that lives in S3 as gzipped parquet files, looping and appending for each hour 
of the day. First, I create a DF from the existing data, then I use a query to 
create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
"""
WITH ids AS (
SELECT DISTINCT
source,
source_id,
target,
target_id
FROM new_data i
LEFT ANTI JOIN existing_data im
ON i.source = im.source
AND i.source_id = im.source_id
AND i.target = im.target
AND i.target = im.target_id
"""
)
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see 
many duplicates. Can someone help me with this and tell me what I am doing 
wrong?

Thanks,
Ben