Umesh Kant created SPARK-43818: ---------------------------------- Summary: Spark Glue job introduces duplicates while writing a dataframe as file to S3 Key: SPARK-43818 URL: https://issues.apache.org/jira/browse/SPARK-43818 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.1.0 Environment: Production Reporter: Umesh Kant
We have AWS Glue (Spark) based ETL framework which processes the data through multiple hops and finally write the dataframe in S3 bucket as parquet files with snappy compression. We have used this framework to process and write data to S3 for 1000+ tables/files and it works fine. But for two of the tables - in memory data frame contains correct records but when data frame gets persisted in S3 as file, it introduces duplicate entries and as the total count remains same duplicate cause missing records as well. {*}Data Points{*}: # This happens only for large(wider tables + millions of rows) # When this happens we notice stage failures and retry succeeding but causing duplicates/missing records {*}Code Steps{*}: |Steps Information|Dataframe|Query / Operation /Action| |Query Raw DB & get no of partition ( to loop one by one)| |select distinct partition_0 FROM <raw db >.<table name>| |Raw DF Query|raw|select SCHDWKID_REF, TASK_REF, LIFECYCLE_REF, TASK_DESC, WHOSE_ENT_NAME, WHOSE_INST_REF, WHOSE_INST_CDE, STENDDAT_STRTDT, STENDDAT_ENDDAT, AGENT_ENT_NAME, AGENT_INST_REF, AGENT_INST_CDE, AGENT_CODE, LOCATION_ENT_NAME, LOCATION_INST_REF, LOCATION_INST_CDE, CASEID_NUMBER, FACE_AMT, TAAR_AMT, AUTH_AMT, TRANSFER_YORN_ENCODE, TRANSFER_YORN_DECODE, TRANSFER_YORN_ELMREF, CASE_YORN_ENCODE, CASE_YORN_DECODE, CASE_YORN_ELMREF, CHANGEID_REF, CNTRCTID_REF, CNTRCTID_NUMBER, KTKDSCID_REF, KWNOFFID_REF, KWNOFFID_CODE, USERID_REF, USERID_CODE, WQUEUEID_REF, WQUEUEID_CODE, STATUS_REF, STATUS_CODE, STATUS_ASAT, LASTUPD_USER, LASTUPD_TERMNO, LASTUPD_PROG, LASTUPD_INFTIM, KWQPRIID_REF, KWQPRIID_CODE, INSURED_NAME, AGENT_NAME, EDM_INGESTED_AT, EDM_INGEST_TIME, PARTITION_0, DELTA_IND, TRANSACT_SEQ from RAW_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK where partition_0= '20230428'| |Structured DF Query|structured|SELECT * FROM RL_LAKE_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK WHERE part_num > 0 | | | | | |Merge DF Generated By joining raw & structured on nks|df_merge|df_merge = structured.join(raw,keys,how='fullouter')| |action column will be added to Merge Df|df_merge|df_merge = df_merge.withColumn("action", fn.when((((df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') \| (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D')) & ( df_merge['raw.chksum'].isNull()) & (~ df_merge['structured.CHKSUM'].isNull())) , "NOACTION") .when((df_merge['structured.CHKSUM'].isNull()) & (df_merge['raw.delta_ind']!= 'D'), "INSERT") .when((df_merge['structured.CHKSUM'] != df_merge['raw.chksum']) & (~ df_merge['structured.CHKSUM'].isNull()) & (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') & ((df_merge['raw.delta_ind'] == 'U') \| (df_merge['raw.delta_ind'] == 'I')), "UPDATE") .when(((df_merge['raw.delta_ind']== 'D') & (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A')) , "DELETE") .when(((df_merge['raw.delta_ind']== 'D') & (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') ) , "DELETECOPY") .when(((df_merge['raw.delta_ind']== 'I') & (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') & (~ df_merge['raw.chksum'].isNull()) & (~ df_merge['structured.CHKSUM'].isNull())) , "DELETEREINSERT") .when(((df_merge['raw.delta_ind']== 'D') & (df_merge['structured.CHKSUM'].isNull())) , "DELETEABSENT") .when((df_merge['structured.CHKSUM'] == df_merge['raw.chksum']), "NOCHANGE"))| | | | | |No Action df will be derived from merge df|df_noaction|df_noaction = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where((df_merge.action == 'NOACTION') \| (df_merge.action == 'NOCHANGE'))| |Delete Copy DF will be derived|df_dcopy|df_dcopy = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETECOPY')| |Delete Absent df will be derived|df_dabs|df_dabs = df_merge.select(keys + ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETEABSENT')| |insert df will be derived|df_insert|df_insert = df_merge.select(keys + ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in keys]).where(df_merge.action == 'INSERT')| |Outdated Df will be derived , records from structured where we have updates|df_outdated|df_outdated = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where(df_merge.action == 'UPDATE')| |Deleted Records will be derived (Inactive)|df_delrecIn|df_delrecIn = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETE')| |Deleted Records will be derived (Active)|df_delrecAc|df_delrecAc = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETE')| |Deleted & Re inserted Records will be derived (Inactive)|df_delReInsertInactive|df_delReInsertInactive = df_merge.select(keys + ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETEREINSERT')| |Deleted & Re inserted Records will be derived (active)|df_delReInsertActive|df_delReInsertActive = df_merge.select(keys + ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in keys]).where(df_merge.action == 'DELETEREINSERT')| |Updated Records df|df_update|df_update = df_merge.select(keys + ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in keys]).where(df_merge.action == 'UPDATE')| | | | | |Updated df active df by unioning all active df generated previously|df_active|df_active = df_insert.unionAll(df_update).unionAll(df_delrecAc).unionAll(df_delReInsertActive).unionAll(df_dabs)| |We will cachec df into Memory|df_active|df_active=df_active.cache()| |Writing df_active df to debug tables by storing in df_active_before_index |df_active_before_index |df_active_before_index = df_active| |adding sk column values to active df|df_active|df_active=dfZipWithIndex(df_active,skey_last_val,sk_col_nme,'part_num1',int(part_cnt),load_type)| |We will cache df into Memory|df_active|df_active = df_active.cache()| |created new df with active df from no action & delete copy df|df_active2|df_active2=df_noaction.unionAll(df_dcopy)| |union of previously generated active2 & active|df_active|df_active=df_active.unionAll(df_active2)| |Updated inactive df |df_inactive_final|df_inactive_final = df_outdated.unionAll(df_delrecIn).unionAll(df_delReInsertInactive)| |Adding to debug tables |df_merge,df_active_final,df_inactive_final, raw,structured, df_active2,df_active_before_index|These debug table steps were added to troubleshoot duplicate/missing records issue. The tables(external) can be queried.| |Debug tables we can query|scheduled_work_active_bindex, scheduled_work_active , scheduled_work_active2 ,scheduled_work_before_write_active , scheduled_work_inactive , scheduled_work_merge, scheduled_work_raw ,scheduled_work_structured |These debug table steps were added to troubleshoot duplicate/missing records issue. The tables(external) can be queried.| |change audit data types of active & inactive df | |change_audit_datatype(fhz_df,structured_audit_col_names)| |active data set null replaced with blanks|df|df=df.fillna('')| |active data set column renaming | |conf_df=df_col_rename(df, raw_all_col_n_partcol, structured_all_col_names)| |For Varchar columns trimming function is applied on active data set| |conf_df = conf_df.withColumn(i[0], fn.trim(conf_df[i[0]]))| |Active dataset column renaming | |df = df.withColumnRenamed("PART_NUM","part_num")| |repartitioning dataset based on dpus|df|df=df.repartition(no_of_dpu)| |Dropping unused columns from active dataset df|df| | |delete partitions| |del_partition| |delete active files | |del_File(bucket_name,del_path, part_col_cnt)| |write active data set(which is same as the data writing to s3 structured path) to debug tables|df (debug table name = scheduled_work_before_write_active)|create_temp_table(df_dict, final_tbl, tbl_path, str_dir_path,'ldap')| |Writing active df to structured layer|df |wrt_df| -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org