I am trying to prototype using a single instance SqlContext and use it toappend 
Dataframes,partition by a field, to the same HDFS folder from multiple threads. 
(Each thread will work with a DataFrame having different partition column 
value.)
I get the exception16/09/24 16:45:12 ERROR [ForkJoinPool-3-worker-13] 
InsertIntoHadoopFsRelation: Aborting job.java.io.FileNotFoundException: File 
hdfs://localhost:9000/user/temp/person/_temporary/0/task_201609241645_0001_m_000000/country=UK-1
 does not exist. at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:644)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:92)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:702)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:698)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:698)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
 at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
 at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
 at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
 at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
 at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

  Below is my code. 
object ConcurrentAppends {
 val outputDir = "hdfs://localhost:9000/user/temp/person"
 def main(args: Array[String]): Unit = { val sqlContext = { val config = new 
SparkConf().setAppName("test").setIfMissing("spark.master", "local[*]") val sc 
= new SparkContext(config) new SQLContext(sc) }
 val futureA = Future(badAppend(sqlContext)) val futureB = 
Future(badAppend(sqlContext))

 val result: Future[Long] = for { countA <- futureA countB <- futureB } yield { 
countA + countB }
 val timeout = 60 second val count = Await.result(result, timeout)
 println("Count=" + count) }

 /** * Appends some rows to folder person. */ def badAppend(sqlContext: 
SQLContext): Long = { println( s""" |sqlContext=${sqlContext.hashCode()} 
|thread=${Thread.currentThread().getName} |""".stripMargin)
 val personsDF: DataFrame = persons(sqlContext)
 personsDF.write.partitionBy("country").mode(SaveMode.Append).save(outputDir)
 personsDF.count
 }

 /** * @return A dataframe of rows */ def persons(sqlContext: SQLContext, 
rowsPerCountry: Int = 100): DataFrame = {
 val personSchema = StructType( Seq( StructField("name", StringType, nullable = 
true), StructField("age", IntegerType, nullable = true), StructField("gender", 
StringType, nullable = true), StructField("country", StringType, nullable = 
true) ) )
 val noOfCountry = 10
 val rows = for { countryIndex <- (0 until noOfCountry) recIndex <- (0 until 
rowsPerCountry)
 } yield (Row(s"foo-$recIndex", 10, "male", s"UK-$countryIndex"))
 val rdd = sqlContext.sparkContext.parallelize(rows) val personsDF = 
sqlContext.createDataFrame(rdd, personSchema)
 personsDF
 }
}}-------------------------------  The above issue is mentioned in 
https://issues.apache.org/jira/browse/SPARK-10109which is still open. 
One way to have concurrent append is to use some sort of sharding - so that 
different thread writes to different folder and then each has its own temporary 
directory. 
It would be very much appreciated if someone would share a better solution.
Thanks in advance for any suggestions!
Shing

Reply via email to