Umesh Kant created SPARK-43818:
----------------------------------

             Summary: Spark Glue job introduces duplicates while writing a 
dataframe as file to S3
                 Key: SPARK-43818
                 URL: https://issues.apache.org/jira/browse/SPARK-43818
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.1.0
         Environment: Production
            Reporter: Umesh Kant


We have AWS Glue (Spark) based ETL framework which processes the data through 
multiple hops and finally write the dataframe in S3 bucket as parquet files 
with snappy compression. We have used this framework to process and write data 
to S3 for 1000+ tables/files and it works fine. But for two of the tables - in 
memory data frame contains correct records but when data frame gets persisted 
in S3 as file, it introduces duplicate entries and as the total count remains 
same duplicate cause missing records as well.

{*}Data Points{*}:
 # This happens only for large(wider tables + millions of rows)
 # When this happens we notice stage failures and retry succeeding but causing 
duplicates/missing records

{*}Code Steps{*}:
|Steps Information|Dataframe|Query / Operation /Action|
|Query Raw DB & get no of partition ( to  loop one by one)| |select distinct 
partition_0 FROM  <raw db >.<table name>|
|Raw DF Query|raw|select SCHDWKID_REF, TASK_REF, LIFECYCLE_REF, TASK_DESC, 
WHOSE_ENT_NAME, WHOSE_INST_REF, WHOSE_INST_CDE, STENDDAT_STRTDT, 
STENDDAT_ENDDAT, AGENT_ENT_NAME, AGENT_INST_REF, AGENT_INST_CDE, AGENT_CODE, 
LOCATION_ENT_NAME, LOCATION_INST_REF, LOCATION_INST_CDE, CASEID_NUMBER, 
FACE_AMT, TAAR_AMT, AUTH_AMT, TRANSFER_YORN_ENCODE, TRANSFER_YORN_DECODE, 
TRANSFER_YORN_ELMREF, CASE_YORN_ENCODE, CASE_YORN_DECODE, CASE_YORN_ELMREF, 
CHANGEID_REF, CNTRCTID_REF, CNTRCTID_NUMBER, KTKDSCID_REF, KWNOFFID_REF, 
KWNOFFID_CODE, USERID_REF, USERID_CODE, WQUEUEID_REF, WQUEUEID_CODE, 
STATUS_REF, STATUS_CODE, STATUS_ASAT, LASTUPD_USER, LASTUPD_TERMNO, 
LASTUPD_PROG, LASTUPD_INFTIM, KWQPRIID_REF, KWQPRIID_CODE, INSURED_NAME, 
AGENT_NAME, EDM_INGESTED_AT, EDM_INGEST_TIME, PARTITION_0, DELTA_IND, 
TRANSACT_SEQ from RAW_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK where 
partition_0= '20230428'|
|Structured  DF Query|structured|SELECT * FROM 
RL_LAKE_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK WHERE part_num > 0 |
| | | |
|Merge DF Generated By joining raw & structured on nks|df_merge|df_merge = 
structured.join(raw,keys,how='fullouter')|
|action column will be added to
 Merge Df|df_merge|df_merge = df_merge.withColumn("action", 
fn.when((((df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') \| 
(df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D')) & ( 
df_merge['raw.chksum'].isNull()) & (~ df_merge['structured.CHKSUM'].isNull())) 
, "NOACTION")
                                                
.when((df_merge['structured.CHKSUM'].isNull()) & (df_merge['raw.delta_ind']!= 
'D'), "INSERT")
                                                
.when((df_merge['structured.CHKSUM'] != df_merge['raw.chksum']) & (~ 
df_merge['structured.CHKSUM'].isNull()) & 
(df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') & 
((df_merge['raw.delta_ind'] == 'U') \| (df_merge['raw.delta_ind'] == 'I')), 
"UPDATE")
                                                
.when(((df_merge['raw.delta_ind']== 'D') & 
(df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A')) , "DELETE")
                                                
.when(((df_merge['raw.delta_ind']== 'D') & 
(df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') ) , "DELETECOPY")
                                                
.when(((df_merge['raw.delta_ind']== 'I') & 
(df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') & (~ 
df_merge['raw.chksum'].isNull()) & (~ df_merge['structured.CHKSUM'].isNull())) 
, "DELETEREINSERT")
                                                
.when(((df_merge['raw.delta_ind']== 'D') & 
(df_merge['structured.CHKSUM'].isNull())) , "DELETEABSENT")
                                                
.when((df_merge['structured.CHKSUM'] == df_merge['raw.chksum']), "NOCHANGE"))|
| | | |
|No Action df will be derived from merge df|df_noaction|df_noaction = 
df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list 
if x.upper() not in keys]).where((df_merge.action == 'NOACTION') \| 
(df_merge.action == 'NOCHANGE'))|
|Delete Copy DF will be derived|df_dcopy|df_dcopy = df_merge.select(keys + 
['structured.' + x.upper() for x in structured_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'DELETECOPY')|
|Delete Absent df will be derived|df_dabs|df_dabs = df_merge.select(keys + 
['raw.' + x.upper() for x in raw_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'DELETEABSENT')|
|insert df will be derived|df_insert|df_insert = df_merge.select(keys + ['raw.' 
+ x.upper() for x in raw_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'INSERT')|
|Outdated Df will be derived , records from structured where we have 
updates|df_outdated|df_outdated = df_merge.select(keys + ['structured.' + 
x.upper() for x in structured_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'UPDATE')|
|Deleted Records will be derived (Inactive)|df_delrecIn|df_delrecIn = 
df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list 
if x.upper() not in keys]).where(df_merge.action == 'DELETE')|
|Deleted  Records will be derived (Active)|df_delrecAc|df_delrecAc = 
df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list 
if x.upper() not in keys]).where(df_merge.action == 'DELETE')|
|Deleted & Re inserted Records will be derived 
(Inactive)|df_delReInsertInactive|df_delReInsertInactive = df_merge.select(keys 
+ ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'DELETEREINSERT')|
|Deleted & Re inserted Records will be derived 
(active)|df_delReInsertActive|df_delReInsertActive = df_merge.select(keys + 
['raw.' + x.upper() for x in raw_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'DELETEREINSERT')|
|Updated Records df|df_update|df_update = df_merge.select(keys + ['raw.' + 
x.upper() for x in raw_cols_list if x.upper() not in 
keys]).where(df_merge.action == 'UPDATE')|
| | | |
|Updated df active df by unioning all active df generated 
previously|df_active|df_active = 
df_insert.unionAll(df_update).unionAll(df_delrecAc).unionAll(df_delReInsertActive).unionAll(df_dabs)|
|We will cachec df into Memory|df_active|df_active=df_active.cache()|
|Writing df_active df to debug tables by storing in df_active_before_index 
|df_active_before_index |df_active_before_index = df_active|
|adding sk column values to active 
df|df_active|df_active=dfZipWithIndex(df_active,skey_last_val,sk_col_nme,'part_num1',int(part_cnt),load_type)|
|We will cache df into Memory|df_active|df_active = df_active.cache()|
|created new df with active df from no action & delete copy 
df|df_active2|df_active2=df_noaction.unionAll(df_dcopy)|
|union of previously generated active2 & 
active|df_active|df_active=df_active.unionAll(df_active2)|
|Updated inactive df |df_inactive_final|df_inactive_final = 
df_outdated.unionAll(df_delrecIn).unionAll(df_delReInsertInactive)|
|Adding to debug tables |df_merge,df_active_final,df_inactive_final, 
raw,structured, df_active2,df_active_before_index|These debug table steps were 
added to troubleshoot duplicate/missing records issue. The tables(external) can 
be queried.|
|Debug tables we can query|scheduled_work_active_bindex, 
scheduled_work_active , scheduled_work_active2 
,scheduled_work_before_write_active , scheduled_work_inactive , 
scheduled_work_merge, scheduled_work_raw ,scheduled_work_structured |These 
debug table steps were added to troubleshoot duplicate/missing records issue. 
The tables(external) can be queried.|
|change audit data types of active & inactive df | 
|change_audit_datatype(fhz_df,structured_audit_col_names)|
|active data set  null replaced with blanks|df|df=df.fillna('')|
|active data set column renaming | |conf_df=df_col_rename(df, 
raw_all_col_n_partcol, structured_all_col_names)|
|For Varchar columns trimming function is applied on active data set| |conf_df 
= conf_df.withColumn(i[0], fn.trim(conf_df[i[0]]))|
|Active dataset column renaming | |df = 
df.withColumnRenamed("PART_NUM","part_num")|
|repartitioning dataset based on dpus|df|df=df.repartition(no_of_dpu)|
|Dropping unused columns from active dataset df|df| |
|delete partitions| |del_partition|
|delete active files | |del_File(bucket_name,del_path, part_col_cnt)|
|write active data set(which is same as the data writing to s3 structured path) 
to debug tables|df (debug table name = 
scheduled_work_before_write_active)|create_temp_table(df_dict, final_tbl, 
tbl_path, str_dir_path,'ldap')|
|Writing active df to structured layer|df |wrt_df|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to