Hello all, I met the following exception while writing data after several executors had lost.
org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:wasbs:// a...@bbb.blob.core.windows.net/PATH/TO/.spark-staging-8836d4b3-3971-454e-ac08-6d899215eebd/ID=302/part-08396-8836d4b3-3971-454e-ac08-6d899215eebd.c000.snappy.parquet at org.apache.hadoop.fs.azure.NativeAzureFileSystem.createInternal(NativeAzureFileSystem.java:1821) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1788) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1632) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098) at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260) I looked through the codebase and found out that if abortTask failed to delete the temp files, then the exception could be thrown at the next attempt. It's because the temp file name for the next attempt is the same with the previous attempt as the temp file format doesn't include "taskAttemptId". private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId f"part-$split%05d-$jobId$ext" } How can we handle the exception? Should we check and remove the temp file before writing the data (newFile) with a new attempt? Or should it throw the exception for some reason? Please share your thoughts on it. Thanks, Eunjin