As a bare minimum you will need to add some error trapping and exception handling!
scala> import org.apache.hadoop.fs.FileAlreadyExistsException import org.apache.hadoop.fs.FileAlreadyExistsException and try your code try { df .coalesce(1) .write .option("fs.s3a.committer.require.uuid", "true") .option("fs.s3a.committer.generate.uuid", "true") .option("fs.s3a.committer.name", "magic") .option("fs.s3a.committer.magic.enabled", "true") .option("orc.compress", "zlib") .mode(SaveMode.Append) .orc(path) } catch { case e: FileAlreadyExistsException => println("File already exists. Handling it...") // other catch blocks for the other exceptions? } FileAlreadyExistsException allows you to continue without crashing etc. Another brute force is that instead of *SaveMode.Append*, you can try using *SaveMode.Overwrite**.* This will overwrite the existing data if it already exists. HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, one verified and tested result holds more weight than a thousand expert opinions. On Fri, 16 Feb 2024 at 21:25, Рамик И <ramik...@gmail.com> wrote: > > Hi > I'm using Spark Streaming to read from Kafka and write to S3. Sometimes I > get errors when writing org.apache.hadoop.fs.FileAlreadyExistsException. > > Spark version: 3.5.0 > scala version : 2.13.8 > Cluster: k8s > > libraryDependencies > org.apache.hadoop.hadoop-aws 3.3.4 > com.amazonaws.aws-java-sdk-s3 1.12.600 > > > > code: > df > .coalesce(1) > .write > .option("fs.s3a.committer.require.uuid", "true") > .option("fs.s3a.committer.generate.uuid", "true") > .option("fs.s3a.committer.name", "magic") > .option("fs.s3a.committer.magic.enabled", "true") > .option("orc.compress", "zlib") > .mode(SaveMode.Append) > .orc(path) > > > > executor 9 > > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Job UUID > 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid > 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using committer magic > to output data to s3a://mybucket/test > 24/02/16 13:05:25 INFO AbstractS3ACommitterFactory: Using Committer > MagicCommitter{AbstractS3ACommitter{role=Task committer > attempt_202402161305112153373254688311399_0367_m_000000_13217, name=magic, > outputPath=s3a://mybucket/test, > workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_000000_13217/__base, > uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid > source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test > 24/02/16 13:05:25 INFO SQLHadoopMapReduceCommitProtocol: Using output > committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Starting: Setup Task > attempt_202402161305112153373254688311399_0367_m_000000_13217 > 24/02/16 13:05:25 INFO AbstractS3ACommitter: Setup Task > attempt_202402161305112153373254688311399_0367_m_000000_13217: duration > 0:00.061s > 24/02/16 13:05:25 ERROR Executor: Exception in task 0.2 in stage 367.1 > (TID 13217) > org.apache.hadoop.fs.FileAlreadyExistsException: > s3a://mybucket/test/part-00000-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc > already exists > > > executor 10 > 24/02/16 13:05:24 INFO AbstractS3ACommitter: Job UUID > 6188aaf6-78a2-4c5a-bafc-0e285d8b89f3 source fs.s3a.committer.uuid > 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using committer magic > to output data to s3a://mybucket/test > 24/02/16 13:05:24 INFO AbstractS3ACommitterFactory: Using Committer > MagicCommitter{AbstractS3ACommitter{role=Task committer > attempt_202402161305112153373254688311399_0367_m_000000_13216, name=magic, > outputPath=s3a://mybucket/test, > workPath=s3a://mybucket/test/__magic/job-6188aaf6-78a2-4c5a-bafc-0e285d8b89f3/tasks/attempt_202402161305112153373254688311399_0367_m_000000_13216/__base, > uuid='6188aaf6-78a2-4c5a-bafc-0e285d8b89f3', uuid > source=JobUUIDSource{text='fs.s3a.committer.uuid'}}} for s3a://mybucket/test > 24/02/16 13:05:24 INFO SQLHadoopMapReduceCommitProtocol: Using output > committer class org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter > 24/02/16 13:05:24 INFO AbstractS3ACommitter: Starting: Setup Task > attempt_202402161305112153373254688311399_0367_m_000000_13216 > 24/02/16 13:05:24 INFO AbstractS3ACommitter: Setup Task > attempt_202402161305112153373254688311399_0367_m_000000_13216: duration > 0:00.112s > 24/02/16 13:05:24 ERROR Executor: Exception in task 0.1 in stage 367.1 > (TID 13216) > org.apache.hadoop.fs.FileAlreadyExistsException: > s3a://mybucket/test/part-00000-bce21fe2-4e56-4075-aafe-6160b3b0334a-c000.zlib.orc > already exists > > > > how can I fix it ? >