[ https://issues.apache.org/jira/browse/SPARK-32395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wan Kun updated SPARK-32395: ---------------------------- Summary: Commit output files after task attempt succeed in dynamic partition datawriter (was: Commit taskattempt's output after succeed in dynamic partition datawriter) > Commit output files after task attempt succeed in dynamic partition > datawriter > ------------------------------------------------------------------------------- > > Key: SPARK-32395 > URL: https://issues.apache.org/jira/browse/SPARK-32395 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0 > Reporter: Wan Kun > Priority: Major > Attachments: 111.png, 112.png > > > Generally, distributed jobs have two stages of committing files: committing > task's output files and committing job's output files. If one attempt fails, > another attempt will try to run the task again, after all tasks succeed, the > job will commit the output of all tasks. > But now if we run a dynamic partition overwrite job, for example, *INSERT > OVERWRITE table dst partition(part) SELECT * from src*, then if one of the > final stage tasks fails, the job will fail. > The first task attempt datawriter in final stage writes the output data > directly to spark stage directory.If the first taskattempt fails, the second > taskattempt datawriter will fail to setup, because the task's output file is > already exists. Then the job will fail. > Therefore, I think we should write the temporary data to the task attempt's > work directory, and commit result files after the task attempt succeed. > > Error Case > !111.png! > !112.png! > > Task Attempt2 Error LOG > > {code:java} > 00:29:16.512 Executor task launch worker for task 3405 ERROR > org.apache.spark.util.Utils: Aborting task > org.apache.hadoop.fs.FileAlreadyExistsException: > /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet > for client 192.168.127.9 already exists > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:2134) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1781) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1705) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:437) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:433) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:433) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:374) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) > at > org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) > at > org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390) > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748).....; not retrying > 00:29:16.583 main ERROR FileFormatWriter: Aborting job > 6de09c5c-b425-4d01-b5c0-1aa0a6e3f58e. > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.1 > in stage 162.0 (TID 3405) can not write to output file: > org.apache.hadoop.fs.FileAlreadyExistsException: > /data/hive/pdd/pdd_bot_marketing/app_order_reminder_dispatch_stats/.spark-staging-f6716d57-31eb-44f0-9f55-dfc0939e1fde/dt=20200716/part-00000-f6716d57-31eb-44f0-9f55-dfc0939e1fde.c000.snappy.parquet > for client 192.168.127.9 already exists > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2938) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2827) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2712) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:604) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:115) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:412) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2226) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2222) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2220) > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120) > at > org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) > at > org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) > at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) > at > org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606) > at > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601) > at org.apache.spark.sql.SparkSqlRunner.run(SparkSqlRunner.scala:34) > at com.leyan.insight.Monitor$.executeSql$1(Monitor.scala:196) > at com.leyan.insight.Monitor$.$anonfun$main$20(Monitor.scala:204) > at > com.leyan.insight.Monitor$.$anonfun$main$20$adapted(Monitor.scala:183) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at com.leyan.insight.Monitor$.$anonfun$main$13(Monitor.scala:183) > at > com.leyan.insight.Monitor$.$anonfun$main$13$adapted(Monitor.scala:122) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org