[ 
https://issues.apache.org/jira/browse/SPARK-32395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wan Kun updated SPARK-32395:
----------------------------
    Summary: Commit output files after  task attempt succeed in dynamic 
partition datawriter  (was: Commit taskattempt's output after succeed in 
dynamic partition datawriter)

> Commit output files after  task attempt succeed in dynamic partition 
> datawriter
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-32395
>                 URL: https://issues.apache.org/jira/browse/SPARK-32395
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Wan Kun
>            Priority: Major
>         Attachments: 111.png, 112.png
>
>
> Generally, distributed jobs have two stages of committing files: committing 
> task's output files and committing job's output files. If one attempt fails, 
> another attempt will try to run the task again, after all tasks succeed, the 
> job will commit the output of all tasks.
>  But now if we run a dynamic partition overwrite job, for example, *INSERT 
> OVERWRITE table dst partition(part) SELECT * from src*, then if one of the 
> final stage tasks fails, the job will fail.
>  The first task attempt  datawriter in final stage writes the output data 
> directly to spark stage directory.If the first taskattempt fails, the second 
> taskattempt datawriter will fail to setup, because the task's output file is 
> already exists. Then the job will fail.
>  Therefore, I think we should write the temporary data to the task attempt's 
> work directory, and commit result files after the task attempt succeed.
>  
> Error Case
>   !111.png!
> !112.png!
>  
> Task Attempt2 Error LOG
>  
> {code:java}
> 00:29:16.512 Executor task launch worker for task 3405 ERROR 
> org.apache.spark.util.Utils: Aborting task
> org.apache.hadoop.fs.FileAlreadyExistsException: 
> /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
>  for client 192.168.127.9 already exists
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
>         at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>         at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>         at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781)
>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433)
>         at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433)
>         at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
>         at 
> org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
>         at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
>         at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
>           at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
>         at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
>         at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748).....; not retrying
> 00:29:16.583 main ERROR FileFormatWriter: Aborting job 
> 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e.
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 
> in stage 162.0 (TID 3405) can not write to output file: 
> org.apache.hadoop.fs.FileAlreadyExistsException: 
> /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet
>  for client 192.168.127.9 already exists
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827)
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712)
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604)
>         at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115)
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412)
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220)        
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
>         at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
>         at scala.Option.foreach(Option.scala:407)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
>         at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
>         at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
>         at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
>         at 
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
>         at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34)
>         at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196)
>         at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204)
>         at 
> com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183)
>         at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>         at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>         at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183)
>         at 
> com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to