one of the issues here is that Parquet creates files with overwrite=false; other output formats do not do this, so implicitly overwrite the output of previous attempts. Which is fine if you are confident that each task attempt (henceforth: TA) is writing to an isolated path.
the next iteration of the s3a magic committer will simply turn off all checks for file/dir overwrite when writing under a path with /__magic/ in it to save on IO, relying on the commit protocol itself to isolate TA output https://github.com/apache/hadoop/pull/3289 I think here you'll have to make sure that if >1 TAruns simultaneously (i.e. speculation is on or one task has partitioned and the driver has started a new one) then cleanup doesn' actually produce inconsistent out. You want to make sure that * each TA writes to its own private location * TA to task commit is atomic (and ideally, fast :) * >1 invocation of TA.commit() ensures the output of only the last committed task is found in the final job output * job cleanup leaves nothing around to run up bills. side issue: wasb doesn't have atomic dir rename so it's not really safe. Consider moving to ADLS Gen2 storage and abfs:// connector. On Thu, 9 Sept 2021 at 05:50, EJ Song <sezr...@gmail.com> wrote: > Hello all, > > I met the following exception while writing data after several executors > had lost. > > org.apache.hadoop.fs.FileAlreadyExistsException: File already > exists:wasbs:// > a...@bbb.blob.core.windows.net/PATH/TO/.spark-staging-8836d4b3-3971-454e-ac08-6d899215eebd/ID=302/part-08396-8836d4b3-3971-454e-ac08-6d899215eebd.c000.snappy.parquet > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.createInternal(NativeAzureFileSystem.java:1821) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1788) > at > org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1632) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098) > at > org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) > at > org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260) > > I looked through the codebase and found out that if abortTask failed to > delete the temp files, then the exception could be thrown at the next > attempt. > It's because the temp file name for the next attempt is the same with the > previous attempt as the temp file format doesn't include "taskAttemptId". > > private def getFilename(taskContext: TaskAttemptContext, ext: String): > String = { > // The file name looks like > part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet > // Note that %05d does not truncate the split number, so if we have > more than 100000 tasks, > // the file name is fine and won't overflow. > val split = taskContext.getTaskAttemptID.getTaskID.getId > f"part-$split%05d-$jobId$ext" > } > > How can we handle the exception? > Should we check and remove the temp file before writing the data (newFile) > with a new attempt? > Or should it throw the exception for some reason? > > Please share your thoughts on it. > > Thanks, > Eunjin >