[ https://issues.apache.org/jira/browse/SPARK-43818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-43818: -------------------------------- Language: (was: English) > Spark Glue job introduces duplicates while writing a dataframe as file to S3 > ---------------------------------------------------------------------------- > > Key: SPARK-43818 > URL: https://issues.apache.org/jira/browse/SPARK-43818 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.1.0 > Environment: Production > Reporter: Umesh Kant > Priority: Major > > We have AWS Glue (Spark) based ETL framework which processes the data through > multiple hops and finally write the dataframe in S3 bucket as parquet files > with snappy compression. We have used this framework to process and write > data to S3 for 1000+ tables/files and it works fine. But for two of the > tables - in memory data frame contains correct records but when data frame > gets persisted in S3 as file, it introduces duplicate entries and as the > total count remains same duplicate cause missing records as well. > {*}Data Points{*}: > # This happens only for large(wider tables + millions of rows) > # When this happens we notice stage failures and retry succeeding but > causing duplicates/missing records > {*}Code Steps{*}: > |Steps Information|Dataframe|Query / Operation /Action| > |Query Raw DB & get no of partition ( to loop one by one)| |select distinct > partition_0 FROM <raw db >.<table name>| > |Raw DF Query|raw|select SCHDWKID_REF, TASK_REF, LIFECYCLE_REF, TASK_DESC, > WHOSE_ENT_NAME, WHOSE_INST_REF, WHOSE_INST_CDE, STENDDAT_STRTDT, > STENDDAT_ENDDAT, AGENT_ENT_NAME, AGENT_INST_REF, AGENT_INST_CDE, AGENT_CODE, > LOCATION_ENT_NAME, LOCATION_INST_REF, LOCATION_INST_CDE, CASEID_NUMBER, > FACE_AMT, TAAR_AMT, AUTH_AMT, TRANSFER_YORN_ENCODE, TRANSFER_YORN_DECODE, > TRANSFER_YORN_ELMREF, CASE_YORN_ENCODE, CASE_YORN_DECODE, CASE_YORN_ELMREF, > CHANGEID_REF, CNTRCTID_REF, CNTRCTID_NUMBER, KTKDSCID_REF, KWNOFFID_REF, > KWNOFFID_CODE, USERID_REF, USERID_CODE, WQUEUEID_REF, WQUEUEID_CODE, > STATUS_REF, STATUS_CODE, STATUS_ASAT, LASTUPD_USER, LASTUPD_TERMNO, > LASTUPD_PROG, LASTUPD_INFTIM, KWQPRIID_REF, KWQPRIID_CODE, INSURED_NAME, > AGENT_NAME, EDM_INGESTED_AT, EDM_INGEST_TIME, PARTITION_0, DELTA_IND, > TRANSACT_SEQ from RAW_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK where > partition_0= '20230428'| > |Structured DF Query|structured|SELECT * FROM > RL_LAKE_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK WHERE part_num > 0 | > | | | | > |Merge DF Generated By joining raw & structured on nks|df_merge|df_merge = > structured.join(raw,keys,how='fullouter')| > |action column will be added to > Merge Df|df_merge|df_merge = df_merge.withColumn("action", > fn.when((((df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') \| > (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D')) & ( > df_merge['raw.chksum'].isNull()) & (~ > df_merge['structured.CHKSUM'].isNull())) , "NOACTION") > > .when((df_merge['structured.CHKSUM'].isNull()) & (df_merge['raw.delta_ind']!= > 'D'), "INSERT") > > .when((df_merge['structured.CHKSUM'] != df_merge['raw.chksum']) & (~ > df_merge['structured.CHKSUM'].isNull()) & > (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') & > ((df_merge['raw.delta_ind'] == 'U') \| (df_merge['raw.delta_ind'] == 'I')), > "UPDATE") > > .when(((df_merge['raw.delta_ind']== 'D') & > (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A')) , "DELETE") > > .when(((df_merge['raw.delta_ind']== 'D') & > (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') ) , "DELETECOPY") > > .when(((df_merge['raw.delta_ind']== 'I') & > (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') & (~ > df_merge['raw.chksum'].isNull()) & (~ > df_merge['structured.CHKSUM'].isNull())) , "DELETEREINSERT") > > .when(((df_merge['raw.delta_ind']== 'D') & > (df_merge['structured.CHKSUM'].isNull())) , "DELETEABSENT") > > .when((df_merge['structured.CHKSUM'] == df_merge['raw.chksum']), "NOCHANGE"))| > | | | | > |No Action df will be derived from merge df|df_noaction|df_noaction = > df_merge.select(keys + ['structured.' + x.upper() for x in > structured_cols_list if x.upper() not in keys]).where((df_merge.action == > 'NOACTION') \| (df_merge.action == 'NOCHANGE'))| > |Delete Copy DF will be derived|df_dcopy|df_dcopy = df_merge.select(keys + > ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in > keys]).where(df_merge.action == 'DELETECOPY')| > |Delete Absent df will be derived|df_dabs|df_dabs = df_merge.select(keys + > ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in > keys]).where(df_merge.action == 'DELETEABSENT')| > |insert df will be derived|df_insert|df_insert = df_merge.select(keys + > ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in > keys]).where(df_merge.action == 'INSERT')| > |Outdated Df will be derived , records from structured where we have > updates|df_outdated|df_outdated = df_merge.select(keys + ['structured.' + > x.upper() for x in structured_cols_list if x.upper() not in > keys]).where(df_merge.action == 'UPDATE')| > |Deleted Records will be derived (Inactive)|df_delrecIn|df_delrecIn = > df_merge.select(keys + ['structured.' + x.upper() for x in > structured_cols_list if x.upper() not in keys]).where(df_merge.action == > 'DELETE')| > |Deleted Records will be derived (Active)|df_delrecAc|df_delrecAc = > df_merge.select(keys + ['structured.' + x.upper() for x in > structured_cols_list if x.upper() not in keys]).where(df_merge.action == > 'DELETE')| > |Deleted & Re inserted Records will be derived > (Inactive)|df_delReInsertInactive|df_delReInsertInactive = > df_merge.select(keys + ['structured.' + x.upper() for x in > structured_cols_list if x.upper() not in keys]).where(df_merge.action == > 'DELETEREINSERT')| > |Deleted & Re inserted Records will be derived > (active)|df_delReInsertActive|df_delReInsertActive = df_merge.select(keys + > ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in > keys]).where(df_merge.action == 'DELETEREINSERT')| > |Updated Records df|df_update|df_update = df_merge.select(keys + ['raw.' + > x.upper() for x in raw_cols_list if x.upper() not in > keys]).where(df_merge.action == 'UPDATE')| > | | | | > |Updated df active df by unioning all active df generated > previously|df_active|df_active = > df_insert.unionAll(df_update).unionAll(df_delrecAc).unionAll(df_delReInsertActive).unionAll(df_dabs)| > |We will cachec df into Memory|df_active|df_active=df_active.cache()| > |Writing df_active df to debug tables by storing in df_active_before_index > |df_active_before_index |df_active_before_index = df_active| > |adding sk column values to active > df|df_active|df_active=dfZipWithIndex(df_active,skey_last_val,sk_col_nme,'part_num1',int(part_cnt),load_type)| > |We will cache df into Memory|df_active|df_active = df_active.cache()| > |created new df with active df from no action & delete copy > df|df_active2|df_active2=df_noaction.unionAll(df_dcopy)| > |union of previously generated active2 & > active|df_active|df_active=df_active.unionAll(df_active2)| > |Updated inactive df |df_inactive_final|df_inactive_final = > df_outdated.unionAll(df_delrecIn).unionAll(df_delReInsertInactive)| > |Adding to debug tables |df_merge,df_active_final,df_inactive_final, > raw,structured, df_active2,df_active_before_index|These debug table steps > were added to troubleshoot duplicate/missing records issue. The > tables(external) can be queried.| > |Debug tables we can query|scheduled_work_active_bindex, > scheduled_work_active , scheduled_work_active2 > ,scheduled_work_before_write_active , scheduled_work_inactive , > scheduled_work_merge, scheduled_work_raw ,scheduled_work_structured |These > debug table steps were added to troubleshoot duplicate/missing records issue. > The tables(external) can be queried.| > |change audit data types of active & inactive df | > |change_audit_datatype(fhz_df,structured_audit_col_names)| > |active data set null replaced with blanks|df|df=df.fillna('')| > |active data set column renaming | |conf_df=df_col_rename(df, > raw_all_col_n_partcol, structured_all_col_names)| > |For Varchar columns trimming function is applied on active data set| > |conf_df = conf_df.withColumn(i[0], fn.trim(conf_df[i[0]]))| > |Active dataset column renaming | |df = > df.withColumnRenamed("PART_NUM","part_num")| > |repartitioning dataset based on dpus|df|df=df.repartition(no_of_dpu)| > |Dropping unused columns from active dataset df|df| | > |delete partitions| |del_partition| > |delete active files | |del_File(bucket_name,del_path, part_col_cnt)| > |write active data set(which is same as the data writing to s3 structured > path) to debug tables|df (debug table name = > scheduled_work_before_write_active)|create_temp_table(df_dict, final_tbl, > tbl_path, str_dir_path,'ldap')| > |Writing active df to structured layer|df |wrt_df| -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org