I am trying to prototype using a single instance SqlContext and use it toappend Dataframes,partition by a field, to the same HDFS folder from multiple threads. (Each thread will work with a DataFrame having different partition column value.) I get the exception16/09/24 16:45:12 ERROR [ForkJoinPool-3-worker-13] InsertIntoHadoopFsRelation: Aborting job.java.io.FileNotFoundException: File hdfs://localhost:9000/user/temp/person/_temporary/0/task_201609241645_0001_m_000000/country=UK-1 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:644) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:92) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:702) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:698) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:698) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
Below is my code. object ConcurrentAppends { val outputDir = "hdfs://localhost:9000/user/temp/person" def main(args: Array[String]): Unit = { val sqlContext = { val config = new SparkConf().setAppName("test").setIfMissing("spark.master", "local[*]") val sc = new SparkContext(config) new SQLContext(sc) } val futureA = Future(badAppend(sqlContext)) val futureB = Future(badAppend(sqlContext)) val result: Future[Long] = for { countA <- futureA countB <- futureB } yield { countA + countB } val timeout = 60 second val count = Await.result(result, timeout) println("Count=" + count) } /** * Appends some rows to folder person. */ def badAppend(sqlContext: SQLContext): Long = { println( s""" |sqlContext=${sqlContext.hashCode()} |thread=${Thread.currentThread().getName} |""".stripMargin) val personsDF: DataFrame = persons(sqlContext) personsDF.write.partitionBy("country").mode(SaveMode.Append).save(outputDir) personsDF.count } /** * @return A dataframe of rows */ def persons(sqlContext: SQLContext, rowsPerCountry: Int = 100): DataFrame = { val personSchema = StructType( Seq( StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("gender", StringType, nullable = true), StructField("country", StringType, nullable = true) ) ) val noOfCountry = 10 val rows = for { countryIndex <- (0 until noOfCountry) recIndex <- (0 until rowsPerCountry) } yield (Row(s"foo-$recIndex", 10, "male", s"UK-$countryIndex")) val rdd = sqlContext.sparkContext.parallelize(rows) val personsDF = sqlContext.createDataFrame(rdd, personSchema) personsDF } }}------------------------------- The above issue is mentioned in https://issues.apache.org/jira/browse/SPARK-10109which is still open. One way to have concurrent append is to use some sort of sharding - so that different thread writes to different folder and then each has its own temporary directory. It would be very much appreciated if someone would share a better solution. Thanks in advance for any suggestions! Shing