[ https://issues.apache.org/jira/browse/SPARK-32395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wan Kun updated SPARK-32395: ---------------------------- Description: Generally, distributed jobs have two stages of committing files: committing task's output files and committing job's output files. If one attempt fails, another attempt will try to run the task again, after all tasks succeed, the job will commit the output of all tasks. But now if we run a dynamic partition overwrite job, for example, *INSERT OVERWRITE table dst partition(part) SELECT * from src*, then if one of the final stage tasks fails, the job will fail. The first taskattempt datawriter in final stage writes the output data directly to spark stage directory.If the first taskattempt fails, the second taskattempt datawriter will fail to setup, because the task's output file is already exists. Then the job will fail. Therefore, I think we should write the temporary data to the taskattempt's work directory, and commit it's result files after the taskattempt succeed. Error Case !111.png! !112.png! Task Attempt2 Error LOG {code:java} 00:29:16.512 Executor task launch worker for task 3405 ERROR org.apache.spark.util.Utils: Aborting task org.apache.hadoop.fs.FileAlreadyExistsException: /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet for client 192.168.127.9 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748).....; not retrying 00:29:16.583 main ERROR FileFormatWriter: Aborting job 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 in stage 162.0 (TID 3405) can not write to output file: org.apache.hadoop.fs.FileAlreadyExistsException: /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet for client 192.168.127.9 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34) at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196) at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204) at com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183) at com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) {code} was: Generally, distributed jobs have two stages of committing files: committing task's output files and committing job's output files. If one attempt fails, another attempt will try to run the task again, after all tasks succeed, the job will commit the output of all tasks. But now if we run a dynamic partition overwrite job, for example, *INSERT OVERWRITE table dst partition(part) SELECT * from src*, then if one of the final stage tasks fails, the job will fail. The first taskattempt datawriter in final stage writes the output data directly to spark stage directory.If the first taskattempt fails, the second taskattempt datawriter will fail to setup, because the task's output file is already exists. Then the job will fail. Therefore, I think we should write the temporary data to the taskattempt's work directory, and commit it's result files after the taskattempt succeed. Error Case !image-2020-07-23-10-39-18-877.png! !image-2020-07-23-10-41-31-701.png! Task Attempt2 Error LOG {code:java} 00:29:16.512 Executor task launch worker for task 3405 ERROR org.apache.spark.util.Utils: Aborting task org.apache.hadoop.fs.FileAlreadyExistsException: /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet for client 192.168.127.9 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241) at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748).....; not retrying 00:29:16.583 main ERROR FileFormatWriter: Aborting job 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 in stage 162.0 (TID 3405) can not write to output file: org.apache.hadoop.fs.FileAlreadyExistsException: /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet for client 192.168.127.9 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34) at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196) at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204) at com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183) at com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) {code} > Commit taskattempt's output after succeed in dynamic partition datawriter > ------------------------------------------------------------------------- > > Key: SPARK-32395 > URL: https://issues.apache.org/jira/browse/SPARK-32395 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0 > Reporter: Wan Kun > Priority: Major > Attachments: 111.png, 112.png > > > Generally, distributed jobs have two stages of committing files: committing > task's output files and committing job's output files. > If one attempt fails, another attempt will try to run the task again, after > all tasks succeed, the job will commit the output of all tasks. > But now if we run a dynamic partition overwrite job, for example, *INSERT > OVERWRITE table dst partition(part) SELECT * from src*, then if one of the > final stage tasks fails, the job will fail. > The first taskattempt datawriter in final stage writes the output data > directly to spark stage directory.If the first taskattempt fails, the second > taskattempt datawriter will fail to setup, because the task's output file is > already exists. Then the job will fail. > Therefore, I think we should write the temporary data to the taskattempt's > work directory, and commit it's result files after the taskattempt succeed. > > Error Case > !111.png! > !112.png! > > Task Attempt2 Error LOG > > {code:java} > 00:29:16.512 Executor task launch worker for task 3405 ERROR > org.apache.spark.util.Utils: Aborting task > org.apache.hadoop.fs.FileAlreadyExistsException: > /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet > for client 192.168.127.9 already exists > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) > at > org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) > at > org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748).....; not retrying > 00:29:16.583 main ERROR FileFormatWriter: Aborting job > 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e. > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 > in stage 162.0 (TID 3405) can not write to output file: > org.apache.hadoop.fs.FileAlreadyExistsException: > /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet > for client 192.168.127.9 already exists > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120) > at > org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) > at > org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) > at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) > at > org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) > at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34) > at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196) > at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204) > at > com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183) > at > com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org