Hello all,

I met the following exception while writing data after several executors
had lost.

org.apache.hadoop.fs.FileAlreadyExistsException: File already
exists:wasbs://
a...@bbb.blob.core.windows.net/PATH/TO/.spark-staging-8836d4b3-3971-454e-ac08-6d899215eebd/ID=302/part-08396-8836d4b3-3971-454e-ac08-6d899215eebd.c000.snappy.parquet
at
org.apache.hadoop.fs.azure.NativeAzureFileSystem.createInternal(NativeAzureFileSystem.java:1821)
at
org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1788)
at
org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1632)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
at
org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236)
at
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260)

I looked through the codebase and found out that if abortTask failed to
delete the temp files, then the exception could be thrown at the next
attempt.
It's because the temp file name for the next attempt is the same with the
previous attempt as the temp file format doesn't include "taskAttemptId".

private def getFilename(taskContext: TaskAttemptContext, ext: String):
String = {
    // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
    // Note that %05d does not truncate the split number, so if we have
more than 100000 tasks,
    // the file name is fine and won't overflow.
    val split = taskContext.getTaskAttemptID.getTaskID.getId
    f"part-$split%05d-$jobId$ext"
  }

How can we handle the exception?
Should we check and remove the temp file before writing the data (newFile)
with a new attempt?
Or should it throw the exception for some reason?

Please share your thoughts on it.

Thanks,
Eunjin

Reply via email to