This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new bedea9a03 [SEDONA-470] Distinguish between missing or null crs from 
the result of geoparquet.metadata (#1211)
bedea9a03 is described below

commit bedea9a03b29c1bbc0a4739f5308daac685aeddd
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jan 22 11:54:45 2024 +0800

    [SEDONA-470] Distinguish between missing or null crs from the result of 
geoparquet.metadata (#1211)
    
    * geoparquet.metadata should show distinct output for missing crs and null 
crs.
    
    * Apply the patch to Spark 3.4 and Spark 3.5
---
 .../datasources/parquet/GeoParquetMetaData.scala   | 15 ++++++++++--
 .../GeoParquetMetadataPartitionReaderFactory.scala |  3 ++-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 28 +++++++++++++++++++++-
 .../GeoParquetMetadataPartitionReaderFactory.scala |  3 ++-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 28 +++++++++++++++++++++-
 .../GeoParquetMetadataPartitionReaderFactory.scala |  3 ++-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 28 +++++++++++++++++++++-
 7 files changed, 100 insertions(+), 8 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index a3f23b9c4..7a88a111b 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -14,13 +14,15 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.json4s.jackson.JsonMethods.parse
-import org.json4s.JValue
+import org.json4s.{JNothing, JNull, JValue}
 
 /**
  * A case class that holds the metadata of geometry column in GeoParquet 
metadata
  * @param encoding Name of the geometry encoding format. Currently only "WKB" 
is supported
  * @param geometryTypes The geometry types of all geometries, or an empty 
array if they are not known.
  * @param bbox Bounding Box of the geometries in the file, formatted according 
to RFC 7946, section 5.
+ * @param crs The CRS of the geometries in the file. None if crs metadata is 
absent, Some(JNull) if crs is null,
+ *            Some(value) if the crs is present and not null.
  */
 case class GeometryFieldMetaData(
   encoding: String,
@@ -58,7 +60,16 @@ object GeoParquetMetaData {
   def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): 
Option[GeoParquetMetaData] = {
     Option(keyValueMetaData.get("geo")).map { geo =>
       implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
-      parse(geo).camelizeKeys.extract[GeoParquetMetaData]
+      val geoObject = parse(geo)
+      val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData]
+      metadata.copy(columns = metadata.columns.map { case (name, column) =>
+        // Postprocess to distinguish between null (JNull) and missing field 
(JNothing).
+        geoObject \ "columns" \ name \ "crs" match {
+          case JNothing => name -> column.copy(crs = None)
+          case JNull => name -> column.copy(crs = Some(JNull))
+          case _ => name -> column
+        }
+      })
     }
   }
 }
diff --git 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index fe1b4f8c5..545bbfb31 100644
--- 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
-            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson)))).orNull)
+            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson))))
+              .getOrElse(UTF8String.fromString("")))
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
--- 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
   val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+  val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
 
   describe("GeoParquet Metadata tests") {
     it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
           columnMetadata.getAs[String]("encoding") == "WKB" &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
 &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
 &&
-            columnMetadata.getAs[String]("crs").nonEmpty
+            columnMetadata.getAs[String]("crs").nonEmpty &&
+            columnMetadata.getAs[String]("crs") != "null"
         }
       })
     }
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
       assert(metadataArray.forall(_.getAs[String]("columns") == null))
     }
+
+    it("Read GeoParquet without CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_omit.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "")
+    }
+
+    it("Read GeoParquet with null CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_null.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "null")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "null")
+    }
   }
 }
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 874d2e743..c192a7a94 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
-            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson)))).orNull)
+            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson))))
+              .getOrElse(UTF8String.fromString("")))
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
   val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+  val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
 
   describe("GeoParquet Metadata tests") {
     it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
           columnMetadata.getAs[String]("encoding") == "WKB" &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
 &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
 &&
-            columnMetadata.getAs[String]("crs").nonEmpty
+            columnMetadata.getAs[String]("crs").nonEmpty &&
+            columnMetadata.getAs[String]("crs") != "null"
         }
       })
     }
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
       assert(metadataArray.forall(_.getAs[String]("columns") == null))
     }
+
+    it("Read GeoParquet without CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_omit.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "")
+    }
+
+    it("Read GeoParquet with null CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_null.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "null")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "null")
+    }
   }
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 874d2e743..c192a7a94 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,7 +66,8 @@ object GeoParquetMetadataPartitionReaderFactory {
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
-            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson)))).orNull)
+            columnMetadata.crs.map(projjson => 
UTF8String.fromString(compact(render(projjson))))
+              .getOrElse(UTF8String.fromString("")))
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 71e159542..44359defb 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
   val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
+  val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
 
   describe("GeoParquet Metadata tests") {
     it("Reading GeoParquet Metadata") {
@@ -40,7 +41,8 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
           columnMetadata.getAs[String]("encoding") == "WKB" &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double])
 &&
             
columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String])
 &&
-            columnMetadata.getAs[String]("crs").nonEmpty
+            columnMetadata.getAs[String]("crs").nonEmpty &&
+            columnMetadata.getAs[String]("crs") != "null"
         }
       })
     }
@@ -63,5 +65,29 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(metadataArray.forall(_.getAs[String]("primary_column") == null))
       assert(metadataArray.forall(_.getAs[String]("columns") == null))
     }
+
+    it("Read GeoParquet without CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_omit.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "")
+    }
+
+    it("Read GeoParquet with null CRS") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation + 
"/example-1.0.0-beta.1.parquet")
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_crs_null.parquet"
+      df.write.format("geoparquet")
+        .option("geoparquet.crs", "null")
+        .mode("overwrite").save(geoParquetSavePath)
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      assert(metadata.getAs[String]("crs") == "null")
+    }
   }
 }

Reply via email to