I am trying to prototype using a single instance SqlContext and use it toappend
Dataframes,partition by a field, to the same HDFS folder from multiple threads.
(Each thread will work with a DataFrame having different partition column
value.)
I get the exception16/09/24 16:45:12 ERROR [ForkJoinPool-3-worker-13]
InsertIntoHadoopFsRelation: Aborting job.java.io.FileNotFoundException: File
hdfs://localhost:9000/user/temp/person/_temporary/0/task_201609241645_0001_m_000000/country=UK-1
does not exist. at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:644)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:92)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:702)
at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:698)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:698)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
Below is my code.
object ConcurrentAppends {
val outputDir = "hdfs://localhost:9000/user/temp/person"
def main(args: Array[String]): Unit = { val sqlContext = { val config = new
SparkConf().setAppName("test").setIfMissing("spark.master", "local[*]") val sc
= new SparkContext(config) new SQLContext(sc) }
val futureA = Future(badAppend(sqlContext)) val futureB =
Future(badAppend(sqlContext))
val result: Future[Long] = for { countA <- futureA countB <- futureB } yield {
countA + countB }
val timeout = 60 second val count = Await.result(result, timeout)
println("Count=" + count) }
/** * Appends some rows to folder person. */ def badAppend(sqlContext:
SQLContext): Long = { println( s""" |sqlContext=${sqlContext.hashCode()}
|thread=${Thread.currentThread().getName} |""".stripMargin)
val personsDF: DataFrame = persons(sqlContext)
personsDF.write.partitionBy("country").mode(SaveMode.Append).save(outputDir)
personsDF.count
}
/** * @return A dataframe of rows */ def persons(sqlContext: SQLContext,
rowsPerCountry: Int = 100): DataFrame = {
val personSchema = StructType( Seq( StructField("name", StringType, nullable =
true), StructField("age", IntegerType, nullable = true), StructField("gender",
StringType, nullable = true), StructField("country", StringType, nullable =
true) ) )
val noOfCountry = 10
val rows = for { countryIndex <- (0 until noOfCountry) recIndex <- (0 until
rowsPerCountry)
} yield (Row(s"foo-$recIndex", 10, "male", s"UK-$countryIndex"))
val rdd = sqlContext.sparkContext.parallelize(rows) val personsDF =
sqlContext.createDataFrame(rdd, personSchema)
personsDF
}
}}------------------------------- The above issue is mentioned in
https://issues.apache.org/jira/browse/SPARK-10109which is still open.
One way to have concurrent append is to use some sort of sharding - so that
different thread writes to different folder and then each has its own temporary
directory.
It would be very much appreciated if someone would share a better solution.
Thanks in advance for any suggestions!
Shing