[ 
https://issues.apache.org/jira/browse/SPARK-32395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wan Kun updated SPARK-32395:
----------------------------
    Description: 
Generally, distributed jobs have two stages of committing files: committing 
task's output files and committing job's output files.
 If one attempt fails, another attempt will try to run the task again, after 
all tasks succeed, the job will commit the output of all tasks.
 But now if we run a dynamic partition overwrite job, for example, *INSERT 
OVERWRITE table dst partition(part) SELECT * from src*, then if one of the 
final stage tasks fails, the job will fail.
 The first taskattempt datawriter in final stage writes the output data 
directly to spark stage directory.If the first taskattempt fails, the second 
taskattempt datawriter will fail to setup, because the task's output file is 
already exists. Then the job will fail.
 Therefore, I think we should write the temporary data to the taskattempt's 
work directory, and commit it's result files after the taskattempt succeed.

 

Error Case

  !111.png!

!112.png!

 

Task Attempt2 Error LOG

 
{code:java}
00:29:16.512 Executor task launch worker for task 3405 ERROR 
org.apache.spark.util.Utils: Aborting task
org.apache.hadoop.fs.FileAlreadyExistsException: 
/data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
 for client 192.168.127.9 already exists
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
        at 
org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
          at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748).....; not retrying
00:29:16.583 main ERROR FileFormatWriter: Aborting job 
6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 in 
stage 162.0 (TID 3405) can not write to output file: 
org.apache.hadoop.fs.FileAlreadyExistsException: 
/data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
 for client 192.168.127.9 already exists
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
        at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34)
        at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196)
        at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204)
        at 
com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183)
        at 
com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
{code}

  was:
Generally, distributed jobs have two stages of committing files: committing 
task's output files and committing job's output files.
 If one attempt fails, another attempt will try to run the task again, after 
all tasks succeed, the job will commit the output of all tasks.
 But now if we run a dynamic partition overwrite job, for example, *INSERT 
OVERWRITE table dst partition(part) SELECT * from src*, then if one of the 
final stage tasks fails, the job will fail.
 The first taskattempt datawriter in final stage writes the output data 
directly to spark stage directory.If the first taskattempt fails, the second 
taskattempt datawriter will fail to setup, because the task's output file is 
already exists. Then the job will fail.
 Therefore, I think we should write the temporary data to the taskattempt's 
work directory, and commit it's result files after the taskattempt succeed.

 

Error Case

!image-2020-07-23-10-39-18-877.png!

 

!image-2020-07-23-10-41-31-701.png!

 

 

Task Attempt2 Error LOG

 
{code:java}
00:29:16.512 Executor task launch worker for task 3405 ERROR 
org.apache.spark.util.Utils: Aborting task
org.apache.hadoop.fs.FileAlreadyExistsException: 
/data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
 for client 192.168.127.9 already exists
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
        at 
org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
          at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748).....; not retrying
00:29:16.583 main ERROR FileFormatWriter: Aborting job 
6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 in 
stage 162.0 (TID 3405) can not write to output file: 
org.apache.hadoop.fs.FileAlreadyExistsException: 
/data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
 for client 192.168.127.9 already exists
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
        at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
        at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34)
        at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196)
        at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204)
        at 
com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183)
        at 
com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
{code}


> Commit taskattempt's output after succeed in dynamic partition datawriter
> -------------------------------------------------------------------------
>
>                 Key: SPARK-32395
>                 URL: https://issues.apache.org/jira/browse/SPARK-32395
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wan Kun
>            Priority: Major
>         Attachments: 111.png, 112.png
>
>
> Generally, distributed jobs have two stages of committing files: committing 
> task's output files and committing job's output files.
>  If one attempt fails, another attempt will try to run the task again, after 
> all tasks succeed, the job will commit the output of all tasks.
>  But now if we run a dynamic partition overwrite job, for example, *INSERT 
> OVERWRITE table dst partition(part) SELECT * from src*, then if one of the 
> final stage tasks fails, the job will fail.
>  The first taskattempt datawriter in final stage writes the output data 
> directly to spark stage directory.If the first taskattempt fails, the second 
> taskattempt datawriter will fail to setup, because the task's output file is 
> already exists. Then the job will fail.
>  Therefore, I think we should write the temporary data to the taskattempt's 
> work directory, and commit it's result files after the taskattempt succeed.
>  
> Error Case
>   !111.png!
> !112.png!
>  
> Task Attempt2 Error LOG
>  
> {code:java}
> 00:29:16.512 Executor task launch worker for task 3405 ERROR 
> org.apache.spark.util.Utils: Aborting task
> org.apache.hadoop.fs.FileAlreadyExistsException: 
> /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
>  for client 192.168.127.9 already exists
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
>         at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>         at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433)
>         at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
>         at 
> org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
>         at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
>           at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748).....; not retrying
> 00:29:16.583 main ERROR FileFormatWriter: Aborting job 
> 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e.
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 
> in stage 162.0 (TID 3405) can not write to output file: 
> org.apache.hadoop.fs.FileAlreadyExistsException: 
> /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
>  for client 192.168.127.9 already exists
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
>         at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
>         at scala.Option.foreach(Option.scala:407)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
>         at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
>         at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
>         at 
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
>         at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34)
>         at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196)
>         at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204)
>         at 
> com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183)
>         at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>         at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>         at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183)
>         at 
> com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to