This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 4573274  [SEDONA-35] address user-data mutability issue with 
`Adapter.toDF()` (#525)
4573274 is described below

commit 4573274a0617bc130647121f9c82f9bd43457450
Author: Yitao Li <[email protected]>
AuthorDate: Wed May 19 00:34:41 2021 +0000

    [SEDONA-35] address user-data mutability issue with `Adapter.toDF()` (#525)
    
    * fix user-data mutability issue with toDF
    
    Signed-off-by: Yitao Li <[email protected]>
    
    * fixing a similar (but different) issue with pair RDD; adding test cases
    
    Signed-off-by: Yitao Li <[email protected]>
---
 .../org/apache/sedona/sql/utils/Adapter.scala      | 18 +++++----
 .../org/apache/sedona/sql/adapterTestScala.scala   | 47 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala 
b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
index 8611e15..761153e 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
@@ -113,11 +113,14 @@ object Adapter {
   }
 
   def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], fieldNames: Seq[String], 
sparkSession: SparkSession): DataFrame = {
-    val rowRdd = spatialRDD.rawSpatialRDD.rdd.map[Row](f => {
-      var userData = f.getUserData
-      f.setUserData(null)
-      if (userData != null) Row.fromSeq(f +: 
userData.asInstanceOf[String].split("\t", -1))
-      else Row.fromSeq(Seq(f))
+    val rowRdd = spatialRDD.rawSpatialRDD.rdd.map[Row](geom => {
+      val userData = geom.getUserData
+      val geomWithoutUserData = geom.copy
+      geomWithoutUserData.setUserData(null)
+      if (userData != null)
+        Row.fromSeq(geomWithoutUserData +: 
userData.asInstanceOf[String].split("\t", -1))
+      else
+        Row.fromSeq(Seq(geom))
     })
     var cols:Seq[StructField] = Seq(StructField("geometry", GeometryUDT))
     if (fieldNames != null && fieldNames.nonEmpty) {
@@ -166,8 +169,9 @@ object Adapter {
     if (fieldNames != null && fieldNames.nonEmpty) {
       val userData = "" + geom.getUserData.asInstanceOf[String]
       val fields = userData.split("\t")
-//      geom.setUserData(null) // Set to null will lead to the null pointer 
exception of the previous line. Not sure why.
-      (Seq(geom), fields)
+      val geomWithoutUserData = geom.copy
+      geomWithoutUserData.setUserData(null)
+      (Seq(geomWithoutUserData), fields)
     }
     else (Seq(geom), Seq())
   }
diff --git a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala 
b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
index da4cb70..aad5b88 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
@@ -27,6 +27,7 @@ import org.apache.sedona.core.spatialRDD.{CircleRDD, 
PointRDD, PolygonRDD}
 import org.apache.sedona.sql.utils.Adapter
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.storage.StorageLevel
+import org.locationtech.jts.geom.Point
 import org.scalatest.GivenWhenThen
 
 class adapterTestScala extends TestBaseScala with GivenWhenThen{
@@ -197,5 +198,51 @@ class adapterTestScala extends TestBaseScala with 
GivenWhenThen{
       val spatialDf = Adapter.toDf(spatialRDD, sparkSession)
       assert(spatialDf.schema.fields(1).name == "LST")
     }
+
+    it("can convert spatial RDD with user data to a valid Dataframe") {
+      val srcDF = sparkSession.sql("select 
ST_PointFromText('40.7128,-74.0060', ',') as geom, \"attr1\" as attr1, 
\"attr2\" as attr2")
+      val rdd = Adapter.toSpatialRdd(srcDF, "geom")
+      val df = Adapter.toDf(rdd, Seq("attr1", "attr2"), sparkSession)
+      df.unpersist(true)
+      // verify the resulting Spark dataframe can be successfully evaluated 
repeatedly
+      for (_ <- 1 to 5) {
+        val rows = df.collect
+        assert(rows.length == 1)
+        val geom = rows(0).get(0).asInstanceOf[Point]
+        assert(geom.getX == 40.7128)
+        assert(geom.getY == -74.006)
+        assert(rows(0).get(1).asInstanceOf[String] == "attr1")
+        assert(rows(0).get(2).asInstanceOf[String] == "attr2")
+      }
+    }
+
+    it("can convert spatial pair RDD with user data to a valid Dataframe") {
+      var srcDF = sparkSession.read.format("csv").option("delimiter", 
"\t").option("header", "false").load(mixedWktGeometryInputLocation)
+      srcDF.createOrReplaceTempView("inputtable")
+      var leftDF = sparkSession.sql("select ST_GeomFromWKT(inputtable._c0) as 
leftGeom, \"attr1\" as attr1, \"attr2\" as attr2 from inputtable")
+      val rightDF = sparkSession.sql("select 
ST_PointFromText('40.7128,-74.0060', ',') as rightGeom, \"attr3\" as attr3, 
\"attr4\" as attr4")
+      val leftRDD = Adapter.toSpatialRdd(leftDF, "leftGeom")
+      leftRDD.analyze()
+      val rightRDD = Adapter.toSpatialRdd(rightDF, "rightGeom")
+      rightRDD.analyze()
+      leftRDD.spatialPartitioning(GridType.QUADTREE)
+      rightRDD.spatialPartitioning(leftRDD.getPartitioner)
+      val pairRDD = JoinQuery.SpatialJoinQueryFlat(leftRDD, rightRDD, true, 
true)
+      val pairDF = Adapter.toDf(pairRDD, Seq("attr1", "attr2"), Seq("attr3", 
"attr4"), sparkSession)
+      pairDF.unpersist(true)
+      // verify the resulting Spark dataframe can be successfully evaluated 
repeatedly
+      for (_ <- 1 to 5) {
+        val rows = pairDF.collect
+        for (row <- rows) {
+          assert(row.get(1).asInstanceOf[String] == "attr1")
+          assert(row.get(2).asInstanceOf[String] == "attr2")
+          val pt = row.get(3).asInstanceOf[Point]
+          assert(pt.getX == 40.7128)
+          assert(pt.getY == -74.006)
+          assert(row.get(4).asInstanceOf[String] == "attr1")
+          assert(row.get(5).asInstanceOf[String] == "attr2")
+        }
+      }
+    }
   }
 }

Reply via email to