[ 
https://issues.apache.org/jira/browse/SPARK-27030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719158#comment-17719158
 ] 

Shrikant Prasad commented on SPARK-27030:
-----------------------------------------

[~lev] [~shivuson...@gmail.com] Did you find any resolution for this issue? I 
am encountering the same error.

> DataFrameWriter.insertInto fails when writing in parallel to a hive table
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27030
>                 URL: https://issues.apache.org/jira/browse/SPARK-27030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Lev Katzav
>            Priority: Major
>              Labels: bulk-closed
>
> When writing to a hive table, the following temp directory is used:
> {code:java}
> /path/to/table/_temporary/0/{code}
> (the 0 at the end comes from the config
> {code:java}
> "mapreduce.job.application.attempt.id"{code}
> since that config is missing, it falls back to 0)
> when there are 2 processes that write to the same table, there could be the 
> following race condition:
>  # p1 creates temp folder and uses it
>  # p2 uses temp folder
>  # p1 finishes and deletes temp folder
>  # p2 fails since temp folder is missing
>  
> It is possible to recreate this error locally with the following code:
> (the code runs locally, but I experienced the same error when running on a 
> cluster
> with 2 jobs writing to the same table)
> {code:java}
> import org.apache.spark.sql.functions._
> val df = spark
>  .range(1000)
>  .toDF("a")
>  .withColumn("partition", lit(0))
>  .cache()
> //create db
> sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count()
> //create table
> df
>  .write
>  .partitionBy("partition")
>  .saveAsTable("db.table")
> val x = (1 to 100).par
> x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10))
> //insert to different partitions in parallel
> x.foreach { p =>
>  val df2 = df
>  .withColumn("partition",lit(p))
>   df2
>    .write
>    .mode(SaveMode.Overwrite)
>    .insertInto("db.table")
> }
> {code}
>  
>  the error would be:
> {code:java}
> java.io.FileNotFoundException: File 
> file:/path/to/warehouse/db.db/table/_temporary/0 does not exist
>  at 
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406)
>  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
>  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
>  at 
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669)
>  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
>  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
>  at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283)
>  at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325)
>  at 
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
>  at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185)
>  at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>  at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>  at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>  at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>  at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
>  at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
>  at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
>  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
>  at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
>  at 
> company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190)
>  at 
> scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91)
>  at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>  at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>  at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>  at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>  at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>  at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>  at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
>  at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
>  at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
>  at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>  at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code}
>  
> A possible work around that I found is to set the config 
> "mapreduce.job.application.attempt.id"
> to a random integer in every job in SparkConf, and thus making each job write 
> to a different path, but that won't work when there is a single spark context 
> (since the config is the same)
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to