one of the issues here is that Parquet creates files with overwrite=false;
other output formats do not do this, so implicitly overwrite the output of
previous attempts. Which is fine if you are confident that each task
attempt (henceforth: TA) is writing to an isolated path.

the next iteration of the s3a magic committer will simply turn off all
checks for file/dir overwrite when writing under a path with /__magic/ in
it to save on IO, relying on the commit protocol itself to isolate TA output
https://github.com/apache/hadoop/pull/3289

I think here you'll have to make sure that if >1 TAruns simultaneously
(i.e. speculation is on or one task has partitioned and the driver has
started  a new one) then cleanup doesn' actually produce inconsistent out.
You want to make sure that

* each TA writes to its own private location
* TA to task commit is atomic (and ideally, fast :)
* >1 invocation of TA.commit() ensures the output of only the last
committed task is found in the final job output
* job cleanup leaves nothing around to run up bills.

side issue: wasb doesn't have atomic dir rename so it's not really safe.
Consider moving to ADLS Gen2 storage and abfs:// connector.


On Thu, 9 Sept 2021 at 05:50, EJ Song <sezr...@gmail.com> wrote:

> Hello all,
>
> I met the following exception while writing data after several executors
> had lost.
>
> org.apache.hadoop.fs.FileAlreadyExistsException: File already
> exists:wasbs://
> a...@bbb.blob.core.windows.net/PATH/TO/.spark-staging-8836d4b3-3971-454e-ac08-6d899215eebd/ID=302/part-08396-8836d4b3-3971-454e-ac08-6d899215eebd.c000.snappy.parquet
> at
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.createInternal(NativeAzureFileSystem.java:1821)
> at
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1788)
> at
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.create(NativeAzureFileSystem.java:1632)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
> at
> org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.&lt;init&gt;(ParquetFileWriter.java:248)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
> at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.&lt;init&gt;(ParquetOutputWriter.scala:37)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:236)
> at
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:260)
>
> I looked through the codebase and found out that if abortTask failed to
> delete the temp files, then the exception could be thrown at the next
> attempt.
> It's because the temp file name for the next attempt is the same with the
> previous attempt as the temp file format doesn't include "taskAttemptId".
>
> private def getFilename(taskContext: TaskAttemptContext, ext: String):
> String = {
>     // The file name looks like
> part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
>     // Note that %05d does not truncate the split number, so if we have
> more than 100000 tasks,
>     // the file name is fine and won't overflow.
>     val split = taskContext.getTaskAttemptID.getTaskID.getId
>     f"part-$split%05d-$jobId$ext"
>   }
>
> How can we handle the exception?
> Should we check and remove the temp file before writing the data (newFile)
> with a new attempt?
> Or should it throw the exception for some reason?
>
> Please share your thoughts on it.
>
> Thanks,
> Eunjin
>

Reply via email to