This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9a696ae [SEDONA-495] Raster data source uses shared FileSystem 
connections which lead to race condition (#1236)
8e9a696ae is described below

commit 8e9a696ae63ccc927de98fba2600acb8b73ad6ef
Author: Jia Yu <[email protected]>
AuthorDate: Thu Feb 15 00:07:09 2024 -0800

    [SEDONA-495] Raster data source uses shared FileSystem connections which 
lead to race condition (#1236)
---
 .../sedona_sql/io/raster/RasterFileFormat.scala    |  5 ++---
 .../org/apache/sedona/sql/TestBaseScala.scala      | 24 +++++++++++++---------
 .../scala/org/apache/sedona/sql/rasterIOTest.scala |  9 +++++---
 3 files changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
index d7851b11d..eaa3d0295 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
@@ -20,7 +20,7 @@
 
 package org.apache.spark.sql.sedona_sql.io.raster
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -29,7 +29,6 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructType
 
 import java.io.IOException
-import java.nio.file.Paths
 import java.util.UUID
 
 private[spark] class RasterFileFormat extends FileFormat with 
DataSourceRegister {
@@ -82,7 +81,7 @@ private class RasterFileWriter(savePath: String,
                                 dataSchema: StructType,
                                 context: TaskAttemptContext) extends 
OutputWriter {
 
-  private val hfs = new Path(savePath).getFileSystem(context.getConfiguration)
+  private val hfs = FileSystem.newInstance(new Path(savePath).toUri, 
context.getConfiguration)
   private val rasterFieldIndex = if (rasterOptions.rasterField.isEmpty) 
getRasterFieldIndex else dataSchema.fieldIndex(rasterOptions.rasterField.get)
 
   private def getRasterFieldIndex: Int = {
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index fec235696..4c50bc3c0 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -78,19 +78,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   val buildingDataLocation: String = resourceFolder + "813_buildings_test.csv"
   val smallRasterDataLocation: String = resourceFolder + "raster/test1.tiff"
   private val factory = new GeometryFactory()
-  var hdfsURI: String = _
-
 
   override def beforeAll(): Unit = {
     SedonaContext.create(sparkSession)
-    // Set up HDFS minicluster
-    val baseDir = new File("./target/hdfs/").getAbsoluteFile
-    FileUtil.fullyDelete(baseDir)
-    val hdfsConf = new HdfsConfiguration
-    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
-    val builder = new MiniDFSCluster.Builder(hdfsConf)
-    val hdfsCluster = builder.build
-    hdfsURI = "hdfs://127.0.0.1:" + hdfsCluster.getNameNodePort + "/"
   }
 
   override def afterAll(): Unit = {
@@ -237,4 +227,18 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll 
{
       }).sum
     }).sum
   }
+
+  /**
+    * Create a mini HDFS cluster and return the HDFS instance and the URI.
+    * @return (MiniDFSCluster, HDFS URI)
+    */
+  def creatMiniHdfs(): (MiniDFSCluster, String) = {
+    val baseDir = new File("./target/hdfs/").getAbsoluteFile
+    FileUtil.fullyDelete(baseDir)
+    val hdfsConf = new HdfsConfiguration
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
+    val builder = new MiniDFSCluster.Builder(hdfsConf)
+    val hdfsCluster = builder.build
+    (hdfsCluster, "hdfs://127.0.0.1:" + hdfsCluster.getNameNodePort + "/")
+  }
 }
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
index d5203e6a0..3875439d2 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -19,6 +19,7 @@
 package org.apache.sedona.sql
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hdfs.MiniDFSCluster
 import org.apache.spark.sql.SaveMode
 import org.junit.Assert.assertEquals
 import org.scalatest.{BeforeAndAfter, GivenWhenThen}
@@ -149,12 +150,14 @@ class rasterIOTest extends TestBaseScala with 
BeforeAndAfter with GivenWhenThen
     }
 
     it("should read geotiff using binary source and write geotiff back to hdfs 
using raster source") {
-      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val miniHDFS: (MiniDFSCluster, String) = creatMiniHdfs()
+      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation).repartition(3)
       val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(hdfsURI + 
"/raster-written")
-      rasterDf = sparkSession.read.format("binaryFile").load(hdfsURI + 
"/raster-written/*")
+      
rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(miniHDFS._2 + 
"/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(miniHDFS._2 + 
"/raster-written/*")
       rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
       assert(rasterDf.count() == rasterCount)
+      miniHDFS._1.shutdown()
     }
   }
 

Reply via email to