This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new fea229f7e [SEDONA-511] Fix reading/writing geoparquet metadata for 
snake_case or camelCase column names (#1270)
fea229f7e is described below

commit fea229f7e1856caf16529289264b50648ea14195
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 6 22:54:28 2024 +0800

    [SEDONA-511] Fix reading/writing geoparquet metadata for snake_case or 
camelCase column names (#1270)
    
    * Fix geoparquet metadata for snake_case and camelCase geometry column names
    
    * Apply the change to Spark 3.4 and 3.5
    
    * Fix binary compatibility issue for Spark 3.0.x
---
 .../datasources/parquet/GeoParquetMetaData.scala   | 41 ++++++++++++++++++----
 .../parquet/GeoParquetWriteSupport.scala           |  3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 23 ++++++++++++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 34 ++++++++++++++++--
 .../parquet/GeoParquetWriteSupport.scala           |  3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 23 ++++++++++++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 34 ++++++++++++++++--
 .../parquet/GeoParquetWriteSupport.scala           |  3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 23 ++++++++++++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 34 ++++++++++++++++--
 10 files changed, 202 insertions(+), 19 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index 7a88a111b..9e8f813ba 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -14,7 +14,8 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.json4s.jackson.JsonMethods.parse
-import org.json4s.{JNothing, JNull, JValue}
+import org.json4s.jackson.compactJson
+import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull, 
JObject, JValue}
 
 /**
  * A case class that holds the metadata of geometry column in GeoParquet 
metadata
@@ -62,14 +63,40 @@ object GeoParquetMetaData {
       implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
       val geoObject = parse(geo)
       val metadata = geoObject.camelizeKeys.extract[GeoParquetMetaData]
-      metadata.copy(columns = metadata.columns.map { case (name, column) =>
+      val columns = (geoObject \ "columns").extract[Map[String, JValue]].map { 
case (name, columnObject) =>
+        val fieldMetadata = 
columnObject.camelizeKeys.extract[GeometryFieldMetaData]
         // Postprocess to distinguish between null (JNull) and missing field 
(JNothing).
-        geoObject \ "columns" \ name \ "crs" match {
-          case JNothing => name -> column.copy(crs = None)
-          case JNull => name -> column.copy(crs = Some(JNull))
-          case _ => name -> column
+        columnObject \ "crs" match {
+          case JNothing => name -> fieldMetadata.copy(crs = None)
+          case JNull => name -> fieldMetadata.copy(crs = Some(JNull))
+          case _ => name -> fieldMetadata
         }
-      })
+      }
+      metadata.copy(columns = columns)
     }
   }
+
+  def toJson(geoParquetMetadata: GeoParquetMetaData): String = {
+    implicit val formats: org.json4s.Formats = DefaultFormats
+    val geoObject = Extraction.decompose(geoParquetMetadata)
+
+    // Make sure that the keys of columns are not transformed to camel case, 
so we use the columns map with
+    // original keys to replace the transformed columns map.
+    val columnsMap = (geoObject \ "columns").extract[Map[String, JValue]].map 
{ case (name, columnObject) =>
+      name -> columnObject.underscoreKeys
+    }
+
+    // We are not using transformField here for binary compatibility with 
various json4s versions shipped with
+    // Spark 3.0.x ~ Spark 3.5.x
+    val serializedGeoObject = geoObject.underscoreKeys mapField {
+      case field@(jField: JField) =>
+        if (jField._1 == "columns") {
+          JField("columns", JObject(columnsMap.toList))
+        } else {
+          field
+        }
+      case field: Any => field
+    }
+    compactJson(serializedGeoObject)
+  }
 }
diff --git 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 2577937a5..e97d2f95e 100644
--- 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
-      implicit val formats: org.json4s.Formats = DefaultFormats
-      val geoParquetMetadataJson = 
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+      val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
       metadata.put("geo", geoParquetMetadataJson)
     }
     new FinalizedWriteContext(metadata)
diff --git 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
--- 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
 package org.apache.sedona.sql
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalatest.BeforeAndAfterAll
 
+import java.util.Collections
 import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
       assert(metadata.getAs[String]("crs") == "null")
     }
+
+    it("Read GeoParquet with snake_case geometry column name and camelCase 
column name") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column_1", GeometryUDT, nullable = false),
+        StructField("geomColumn2", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_column_name_styles.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = row.getJavaMap(row.fieldIndex("columns"))
+      assert(metadata.containsKey("geom_column_1"))
+      assert(!metadata.containsKey("geoColumn1"))
+      assert(metadata.containsKey("geomColumn2"))
+      assert(!metadata.containsKey("geom_column2"))
+      assert(!metadata.containsKey("geom_column_2"))
+    }
   }
 }
diff --git 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
--- 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(testData.asJava, 
schema).repartition(1)
       val geoParquetSavePath = geoparquetoutputlocation + 
"/multi_geoms.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
 
       // Find parquet files in geoParquetSavePath directory and validate their 
metadata
       validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
       val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
       val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
       assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
       assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       }
     }
 
+    it("GeoParquet save should work with snake_case column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/snake_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geom_column")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
+    it("GeoParquet save should work with camelCase column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geomColumn", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/camel_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geomColumn")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
     it("GeoParquet save should write user specified version and crs to geo 
metadata") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
       // This CRS is taken from 
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 984edb5c0..02aec14a2 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
-      implicit val formats: org.json4s.Formats = DefaultFormats
-      val geoParquetMetadataJson = 
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+      val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
       metadata.put("geo", geoParquetMetadataJson)
     }
     new FinalizedWriteContext(metadata)
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
 package org.apache.sedona.sql
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalatest.BeforeAndAfterAll
 
+import java.util.Collections
 import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
       assert(metadata.getAs[String]("crs") == "null")
     }
+
+    it("Read GeoParquet with snake_case geometry column name and camelCase 
column name") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column_1", GeometryUDT, nullable = false),
+        StructField("geomColumn2", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_column_name_styles.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = row.getJavaMap(row.fieldIndex("columns"))
+      assert(metadata.containsKey("geom_column_1"))
+      assert(!metadata.containsKey("geoColumn1"))
+      assert(metadata.containsKey("geomColumn2"))
+      assert(!metadata.containsKey("geom_column2"))
+      assert(!metadata.containsKey("geom_column_2"))
+    }
   }
 }
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(testData.asJava, 
schema).repartition(1)
       val geoParquetSavePath = geoparquetoutputlocation + 
"/multi_geoms.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
 
       // Find parquet files in geoParquetSavePath directory and validate their 
metadata
       validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
       val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
       val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
       assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
       assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       }
     }
 
+    it("GeoParquet save should work with snake_case column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/snake_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geom_column")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
+    it("GeoParquet save should work with camelCase column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geomColumn", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/camel_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geomColumn")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
     it("GeoParquet save should write user specified version and crs to geo 
metadata") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
       // This CRS is taken from 
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index aa81f8725..5e47bec78 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -201,8 +201,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
-      implicit val formats: org.json4s.Formats = DefaultFormats
-      val geoParquetMetadataJson = 
compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
+      val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
       metadata.put("geo", geoParquetMetadataJson)
     }
     new FinalizedWriteContext(metadata)
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 44359defb..03cf5ff91 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -14,8 +14,11 @@
 package org.apache.sedona.sql
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalatest.BeforeAndAfterAll
 
+import java.util.Collections
 import scala.collection.JavaConverters._
 
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
@@ -89,5 +92,25 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
       assert(metadata.getAs[String]("crs") == "null")
     }
+
+    it("Read GeoParquet with snake_case geometry column name and camelCase 
column name") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column_1", GeometryUDT, nullable = false),
+        StructField("geomColumn2", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_column_name_styles.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+
+      val dfMeta = 
sparkSession.read.format("geoparquet.metadata").load(geoParquetSavePath)
+      val row = dfMeta.collect()(0)
+      val metadata = row.getJavaMap(row.fieldIndex("columns"))
+      assert(metadata.containsKey("geom_column_1"))
+      assert(!metadata.containsKey("geoColumn1"))
+      assert(metadata.containsKey("geomColumn2"))
+      assert(!metadata.containsKey("geom_column2"))
+      assert(!metadata.containsKey("geom_column_2"))
+    }
   }
 }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index c2d59756b..7a556cddd 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -141,7 +141,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(testData.asJava, 
schema).repartition(1)
       val geoParquetSavePath = geoparquetoutputlocation + 
"/multi_geoms.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
 
       // Find parquet files in geoParquetSavePath directory and validate their 
metadata
       validateGeoParquetMetadata(geoParquetSavePath) { geo =>
@@ -175,7 +175,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       ))
       val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
       val geoParquetSavePath = geoparquetoutputlocation + "/empty.parquet"
-      df.write.format("geoparquet").save(geoParquetSavePath)
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
       val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
       assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
       assert(0 == df2.count())
@@ -189,6 +189,36 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       }
     }
 
+    it("GeoParquet save should work with snake_case column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geom_column", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/snake_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geom_column")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
+    it("GeoParquet save should work with camelCase column names") {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType, nullable = false),
+        StructField("geomColumn", GeometryUDT, nullable = false)
+      ))
+      val df = sparkSession.createDataFrame(Collections.emptyList[Row](), 
schema)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/camel_case_column_name.parquet"
+      df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geomField = df2.schema.fields(1)
+      assert(geomField.name == "geomColumn")
+      assert(geomField.dataType.isInstanceOf[GeometryUDT])
+      assert(0 == df2.count())
+    }
+
     it("GeoParquet save should write user specified version and crs to geo 
metadata") {
       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
       // This CRS is taken from 
https://proj.org/en/9.3/specifications/projjson.html#geographiccrs

Reply via email to