This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 4573274 [SEDONA-35] address user-data mutability issue with
`Adapter.toDF()` (#525)
4573274 is described below
commit 4573274a0617bc130647121f9c82f9bd43457450
Author: Yitao Li <[email protected]>
AuthorDate: Wed May 19 00:34:41 2021 +0000
[SEDONA-35] address user-data mutability issue with `Adapter.toDF()` (#525)
* fix user-data mutability issue with toDF
Signed-off-by: Yitao Li <[email protected]>
* fixing a similar (but different) issue with pair RDD; adding test cases
Signed-off-by: Yitao Li <[email protected]>
---
.../org/apache/sedona/sql/utils/Adapter.scala | 18 +++++----
.../org/apache/sedona/sql/adapterTestScala.scala | 47 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
index 8611e15..761153e 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
@@ -113,11 +113,14 @@ object Adapter {
}
def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], fieldNames: Seq[String],
sparkSession: SparkSession): DataFrame = {
- val rowRdd = spatialRDD.rawSpatialRDD.rdd.map[Row](f => {
- var userData = f.getUserData
- f.setUserData(null)
- if (userData != null) Row.fromSeq(f +:
userData.asInstanceOf[String].split("\t", -1))
- else Row.fromSeq(Seq(f))
+ val rowRdd = spatialRDD.rawSpatialRDD.rdd.map[Row](geom => {
+ val userData = geom.getUserData
+ val geomWithoutUserData = geom.copy
+ geomWithoutUserData.setUserData(null)
+ if (userData != null)
+ Row.fromSeq(geomWithoutUserData +:
userData.asInstanceOf[String].split("\t", -1))
+ else
+ Row.fromSeq(Seq(geom))
})
var cols:Seq[StructField] = Seq(StructField("geometry", GeometryUDT))
if (fieldNames != null && fieldNames.nonEmpty) {
@@ -166,8 +169,9 @@ object Adapter {
if (fieldNames != null && fieldNames.nonEmpty) {
val userData = "" + geom.getUserData.asInstanceOf[String]
val fields = userData.split("\t")
-// geom.setUserData(null) // Set to null will lead to the null pointer
exception of the previous line. Not sure why.
- (Seq(geom), fields)
+ val geomWithoutUserData = geom.copy
+ geomWithoutUserData.setUserData(null)
+ (Seq(geomWithoutUserData), fields)
}
else (Seq(geom), Seq())
}
diff --git a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
index da4cb70..aad5b88 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/adapterTestScala.scala
@@ -27,6 +27,7 @@ import org.apache.sedona.core.spatialRDD.{CircleRDD,
PointRDD, PolygonRDD}
import org.apache.sedona.sql.utils.Adapter
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.storage.StorageLevel
+import org.locationtech.jts.geom.Point
import org.scalatest.GivenWhenThen
class adapterTestScala extends TestBaseScala with GivenWhenThen{
@@ -197,5 +198,51 @@ class adapterTestScala extends TestBaseScala with
GivenWhenThen{
val spatialDf = Adapter.toDf(spatialRDD, sparkSession)
assert(spatialDf.schema.fields(1).name == "LST")
}
+
+ it("can convert spatial RDD with user data to a valid Dataframe") {
+ val srcDF = sparkSession.sql("select
ST_PointFromText('40.7128,-74.0060', ',') as geom, \"attr1\" as attr1,
\"attr2\" as attr2")
+ val rdd = Adapter.toSpatialRdd(srcDF, "geom")
+ val df = Adapter.toDf(rdd, Seq("attr1", "attr2"), sparkSession)
+ df.unpersist(true)
+ // verify the resulting Spark dataframe can be successfully evaluated
repeatedly
+ for (_ <- 1 to 5) {
+ val rows = df.collect
+ assert(rows.length == 1)
+ val geom = rows(0).get(0).asInstanceOf[Point]
+ assert(geom.getX == 40.7128)
+ assert(geom.getY == -74.006)
+ assert(rows(0).get(1).asInstanceOf[String] == "attr1")
+ assert(rows(0).get(2).asInstanceOf[String] == "attr2")
+ }
+ }
+
+ it("can convert spatial pair RDD with user data to a valid Dataframe") {
+ var srcDF = sparkSession.read.format("csv").option("delimiter",
"\t").option("header", "false").load(mixedWktGeometryInputLocation)
+ srcDF.createOrReplaceTempView("inputtable")
+ var leftDF = sparkSession.sql("select ST_GeomFromWKT(inputtable._c0) as
leftGeom, \"attr1\" as attr1, \"attr2\" as attr2 from inputtable")
+ val rightDF = sparkSession.sql("select
ST_PointFromText('40.7128,-74.0060', ',') as rightGeom, \"attr3\" as attr3,
\"attr4\" as attr4")
+ val leftRDD = Adapter.toSpatialRdd(leftDF, "leftGeom")
+ leftRDD.analyze()
+ val rightRDD = Adapter.toSpatialRdd(rightDF, "rightGeom")
+ rightRDD.analyze()
+ leftRDD.spatialPartitioning(GridType.QUADTREE)
+ rightRDD.spatialPartitioning(leftRDD.getPartitioner)
+ val pairRDD = JoinQuery.SpatialJoinQueryFlat(leftRDD, rightRDD, true,
true)
+ val pairDF = Adapter.toDf(pairRDD, Seq("attr1", "attr2"), Seq("attr3",
"attr4"), sparkSession)
+ pairDF.unpersist(true)
+ // verify the resulting Spark dataframe can be successfully evaluated
repeatedly
+ for (_ <- 1 to 5) {
+ val rows = pairDF.collect
+ for (row <- rows) {
+ assert(row.get(1).asInstanceOf[String] == "attr1")
+ assert(row.get(2).asInstanceOf[String] == "attr2")
+ val pt = row.get(3).asInstanceOf[Point]
+ assert(pt.getX == 40.7128)
+ assert(pt.getY == -74.006)
+ assert(row.get(4).asInstanceOf[String] == "attr1")
+ assert(row.get(5).asInstanceOf[String] == "attr2")
+ }
+ }
+ }
}
}