This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new bedea9a03 [SEDONA-470] Distinguish between missing or null crs from
the result of geoparquet.metadata (#1211)
bedea9a03 is described below
commit bedea9a03b29c1bbc0a4739f5308daac685aeddd
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jan 22 11:54:45 2024 +0800
[SEDONA-470] Distinguish between missing or null crs from the result of
geoparquet.metadata (#1211)
* geoparquet.metadata should show distinct output for missing crs and null
crs.
* Apply the patch to Spark 3.4 and Spark 3.5
---
.../datasources/parquet/GeoParquetMetaData.scala | 15 ++++++++++--
.../GeoParquetMetadataPartitionReaderFactory.scala | 3 ++-
.../sedona/sql/GeoParquetMetadataTests.scala | 28 +++++++++++++++++++++-
.../GeoParquetMetadataPartitionReaderFactory.scala | 3 ++-
.../sedona/sql/GeoParquetMetadataTests.scala | 28 +++++++++++++++++++++-
.../GeoParquetMetadataPartitionReaderFactory.scala | 3 ++-
.../sedona/sql/GeoParquetMetadataTests.scala | 28 +++++++++++++++++++++-
7 files changed, 100 insertions(+), 8 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index a3f23b9c4..7a88a111b 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -14,13 +14,15 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.json4s.jackson.JsonMethods.parse
-import org.json4s.JValue
+import org.json4s.{JNothing, JNull, JValue}
/**
* A case class that holds the metadata of geometry column in GeoParquet
metadata
* @param encoding Name of the geometry encoding format. Currently only "WKB"
is supported
* @param geometryTypes The geometry types of all geometries, or an empty
array if they are not known.
* @param bbox Bounding Box of the geometries in the file, formatted according
to RFC 7946, section 5.
+ * @param crs The CRS of the geometries in the file. None if crs metadata is
absent, Some(JNull) if crs is null,
+ * Some(value) if the crs is present and not null.
*/
case class GeometryFieldMetaData(
encoding: String,
@@ -58,7 +60,16 @@ object GeoParquetMetaData {
def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]):
Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
- parse(geo).camelizeKeys.extract[GeoParquetMetaData]
+ val geoObject = parse(geo)
+ val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData]
+ metadata.copy(columns = metadata.columns.map { case (name, column) =>
+ // Postprocess to distinguish between null (JNull) and missing field
(JNothing).
+ geoObject \ "columns" \ name \ "crs" match {
+ case JNothing => name -> column.copy(crs = None)
+ case JNull => name -> column.copy(crs = Some(JNull))
+ case _ => name -> column
+ }
+ })
}
}
}
diff --git
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index fe1b4f8c5..545bbfb31 100644
---
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
- columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson)))).orNull)
+ columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
+ .getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
---
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+ val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
&&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
&&
- columnMetadata.getAs[String]("crs").nonEmpty
+ columnMetadata.getAs[String]("crs").nonEmpty &&
+ columnMetadata.getAs[String]("crs") != "null"
}
})
}
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}
+
+ it("Read GeoParquet without CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_omit.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "")
+ }
+
+ it("Read GeoParquet with null CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_null.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "null")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "null")
+ }
}
}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 874d2e743..c192a7a94 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
- columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson)))).orNull)
+ columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
+ .getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+ val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
&&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
&&
- columnMetadata.getAs[String]("crs").nonEmpty
+ columnMetadata.getAs[String]("crs").nonEmpty &&
+ columnMetadata.getAs[String]("crs") != "null"
}
})
}
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}
+
+ it("Read GeoParquet without CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_omit.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "")
+ }
+
+ it("Read GeoParquet with null CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_null.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "null")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "null")
+ }
}
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 874d2e743..c192a7a94 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
- columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson)))).orNull)
+ columnMetadata.crs.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
+ .getOrElse(UTF8String.fromString("")))
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+ val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
describe("GeoParquet Metadata tests") {
it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
columnMetadata.getAs[String]("encoding") == "WKB" &&
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
&&
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
&&
- columnMetadata.getAs[String]("crs").nonEmpty
+ columnMetadata.getAs[String]("crs").nonEmpty &&
+ columnMetadata.getAs[String]("crs") != "null"
}
})
}
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
assert(metadataArray.forall(_.getAs[String]("columns") == null))
}
+
+ it("Read GeoParquet without CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_omit.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "")
+ }
+
+ it("Read GeoParquet with null CRS") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation +
"/example-1.0.0-beta.1.parquet")
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_crs_null.parquet"
+ df.write.format("geoparquet")
+ .option("geoparquet.crs", "null")
+ .mode("overwrite").save(geoParquetSavePath)
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ assert(metadata.getAs[String]("crs") == "null")
+ }
}
}