This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new fea229f7e [SEDONA-511] Fix reading/writing geoparquet metadata for
snake_case or camelCase column names (#1270)
fea229f7e is described below
commit fea229f7e1856caf16529289264b50648ea14195
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 6 22:54:28 2024 +0800
[SEDONA-511] Fix reading/writing geoparquet metadata for snake_case or
camelCase column names (#1270)
* Fix geoparquet metadata for snake_case and camelCase geometry column names
* Apply the change to Spark 3.4 and 3.5
* Fix binary compatibility issue for Spark 3.0.x
---
.../datasources/parquet/GeoParquetMetaData.scala | 41 ++++++++++++++++++----
.../parquet/GeoParquetWriteSupport.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 23 ++++++++++++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 34 ++++++++++++++++--
.../parquet/GeoParquetWriteSupport.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 23 ++++++++++++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 34 ++++++++++++++++--
.../parquet/GeoParquetWriteSupport.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 23 ++++++++++++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 34 ++++++++++++++++--
10 files changed, 202 insertions(+), 19 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index 7a88a111b..9e8f813ba 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -14,7 +14,8 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.json4s.jackson.JsonMethods.parse
-import org.json4s.{JNothing, JNull, JValue}
+import org.json4s.jackson.compactJson
+import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull,
JObject, JValue}
/**
* A case class that holds the metadata of geometry column in GeoParquet
metadata
@@ -62,14 +63,40 @@ object GeoParquetMetaData {
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val geoObject = parse(geo)
val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData]
- metadata.copy(columns = metadata.columns.map { case (name, column) =>
+ val columns = (geoObject \ "columns").extract[Map[String, JValue]].map {
case (name, columnObject) =>
+ val fieldMetadata =
columnObject.camelizeKeys.extract[GeometryFieldMetaData]
// Postprocess to distinguish between null (JNull) and missing field
(JNothing).
- geoObject \ "columns" \ name \ "crs" match {
- case JNothing => name -> column.copy(crs = None)
- case JNull => name -> column.copy(crs = Some(JNull))
- case _ => name -> column
+ columnObject \ "crs" match {
+ case JNothing => name -> fieldMetadata.copy(crs = None)
+ case JNull => name -> fieldMetadata.copy(crs = Some(JNull))
+ case _ => name -> fieldMetadata
}
- })
+ }
+ metadata.copy(columns = columns)
}
}
+
+ def toJson(geoParquetMetadata: GeoParquetMetaData): String = {
+ implicit val formats: org.json4s.Formats = DefaultFormats
+ val geoObject = Extraction.decompose(geoParquetMetadata)
+
+ // Make sure that the keys of columns are not transformed to camel case,
so we use the columns map with
+ // original keys to replace the transformed columns map.
+ val columnsMap = (geoObject \ "columns").extract[Map[String, JValue]].map
{ case (name, columnObject) =>
+ name -> columnObject.underscoreKeys
+ }
+
+ // We are not using transformField here for binary compatibility with
various json4s versions shipped with
+ // Spark 3.0.x ~ Spark 3.5.x
+ val serializedGeoObject = geoObject.underscoreKeys mapField {
+ case field@(jField: JField) =>
+ if (jField._1 == "columns") {
+ JField("columns", JObject(columnsMap.toList))
+ } else {
+ field
+ }
+ case field: Any => field
+ }
+ compactJson(serializedGeoObject)
+ }
}
diff --git
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 2577937a5..e97d2f95e 100644
---
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
- implicit val formats: org.json4s.Formats = DefaultFormats
- val geoParquetMetadataJson =
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+ val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
metadata.put("geo", geoParquetMetadataJson)
}
new FinalizedWriteContext(metadata)
diff --git
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
---
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
package org.apache.sedona.sql
import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
+import java.util.Collections
import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
+
+ it("Read GeoParquet with snake_case geometry column name and camelCase
column name") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column_1", GeometryUDT, nullable = false),
+ StructField("geomColumn2", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_column_name_styles.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata = row.getJavaMap(row.fieldIndex("columns"))
+ assert(metadata.containsKey("geom_column_1"))
+ assert(!metadata.containsKey("geoColumn1"))
+ assert(metadata.containsKey("geomColumn2"))
+ assert(!metadata.containsKey("geom_column2"))
+ assert(!metadata.containsKey("geom_column_2"))
+ }
}
}
diff --git
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
---
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(testData.asJava,
schema).repartition(1)
val geoParquetSavePath = geoparquetoutputlocation +
"/multi_geoms.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
// Find parquet files in geoParquetSavePath directory and validate their
metadata
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoParquet save should work with snake_case column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/snake_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geom_column")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
+ it("GeoParquet save should work with camelCase column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geomColumn", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/camel_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geomColumn")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
it("GeoParquet save should write user specified version and crs to geo
metadata") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
// This CRS is taken from
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 984edb5c0..02aec14a2 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
- implicit val formats: org.json4s.Formats = DefaultFormats
- val geoParquetMetadataJson =
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+ val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
metadata.put("geo", geoParquetMetadataJson)
}
new FinalizedWriteContext(metadata)
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
package org.apache.sedona.sql
import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
+import java.util.Collections
import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
+
+ it("Read GeoParquet with snake_case geometry column name and camelCase
column name") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column_1", GeometryUDT, nullable = false),
+ StructField("geomColumn2", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_column_name_styles.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata = row.getJavaMap(row.fieldIndex("columns"))
+ assert(metadata.containsKey("geom_column_1"))
+ assert(!metadata.containsKey("geoColumn1"))
+ assert(metadata.containsKey("geomColumn2"))
+ assert(!metadata.containsKey("geom_column2"))
+ assert(!metadata.containsKey("geom_column_2"))
+ }
}
}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(testData.asJava,
schema).repartition(1)
val geoParquetSavePath = geoparquetoutputlocation +
"/multi_geoms.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
// Find parquet files in geoParquetSavePath directory and validate their
metadata
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoParquet save should work with snake_case column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/snake_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geom_column")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
+ it("GeoParquet save should work with camelCase column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geomColumn", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/camel_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geomColumn")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
it("GeoParquet save should write user specified version and crs to geo
metadata") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
// This CRS is taken from
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index aa81f8725..5e47bec78 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
- implicit val formats: org.json4s.Formats = DefaultFormats
- val geoParquetMetadataJson =
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+ val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
metadata.put("geo", geoParquetMetadataJson)
}
new FinalizedWriteContext(metadata)
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
package org.apache.sedona.sql
import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
+import java.util.Collections
import scala.collection.JavaConverters._
class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
assert(metadata.getAs[String]("crs") == "null")
}
+
+ it("Read GeoParquet with snake_case geometry column name and camelCase
column name") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column_1", GeometryUDT, nullable = false),
+ StructField("geomColumn2", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_column_name_styles.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+ val dfMeta =
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+ val row = dfMeta.collect()(0)
+ val metadata = row.getJavaMap(row.fieldIndex("columns"))
+ assert(metadata.containsKey("geom_column_1"))
+ assert(!metadata.containsKey("geoColumn1"))
+ assert(metadata.containsKey("geomColumn2"))
+ assert(!metadata.containsKey("geom_column2"))
+ assert(!metadata.containsKey("geom_column_2"))
+ }
}
}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(testData.asJava,
schema).repartition(1)
val geoParquetSavePath = geoparquetoutputlocation +
"/multi_geoms.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
// Find parquet files in geoParquetSavePath directory and validate their
metadata
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
))
val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
- df.write.format("geoparquet").save(geoParquetSavePath)
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoParquet save should work with snake_case column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geom_column", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/snake_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geom_column")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
+ it("GeoParquet save should work with camelCase column names") {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("geomColumn", GeometryUDT, nullable = false)
+ ))
+ val df = sparkSession.createDataFrame(Collections.emptyList[Row](),
schema)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/camel_case_column_name.parquet"
+ df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geomField = df2.schema.fields(1)
+ assert(geomField.name == "geomColumn")
+ assert(geomField.dataType.isInstanceOf[GeometryUDT])
+ assert(0 == df2.count())
+ }
+
it("GeoParquet save should write user specified version and crs to geo
metadata") {
val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
// This CRS is taken from
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs