This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new e4c7d7bce [SEDONA-562] Add a native DataFrame based GeoJSON reader and 
writer (#1432)
e4c7d7bce is described below

commit e4c7d7bcedca7bae1a4da81ec99336fbb75ca556
Author: Jia Yu <[email protected]>
AuthorDate: Tue May 28 00:26:13 2024 -0700

    [SEDONA-562] Add a native DataFrame based GeoJSON reader and writer (#1432)
    
    * Push the code
    
    * Minor tweak
---
 docs/api/sql/Constructor.md                        |   3 +
 docs/api/sql/Function.md                           |   3 +
 docs/tutorial/rdd.md                               |   8 +-
 docs/tutorial/sql.md                               | 205 +++++++++++---
 ...org.apache.spark.sql.sources.DataSourceRegister |   1 +
 .../sedona_sql/io/geojson/GeoJSONFileFormat.scala  | 186 +++++++++++++
 .../io/geojson/GeoJSONJacksonGenerator.scala       | 301 +++++++++++++++++++++
 .../io/geojson/GeoJSONOutputWriter.scala           |  60 ++++
 .../sql/sedona_sql/io/geojson/GeoJSONUtils.scala   | 117 ++++++++
 .../sedona_sql/io/geojson/SparkCompatUtil.scala    | 148 ++++++++++
 .../src/test/resources/geojson/core-item.json      |   1 +
 .../geojson/geojson_feature-collection.json        |  76 ++++++
 spark/common/src/test/resources/geojson/test1.json |  39 +++
 .../common/src/test/resources/geojson/test12.json  |  41 +++
 spark/common/src/test/resources/geojson/test2.json |  21 ++
 .../org/apache/sedona/sql/geojsonIOTests.scala     | 224 +++++++++++++++
 16 files changed, 1401 insertions(+), 33 deletions(-)

diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index 6498e5eb4..efe6e62ca 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -116,6 +116,9 @@ POLYGON ((0.703125 0.87890625, 0.703125 1.0546875, 
1.0546875 1.0546875, 1.054687
 
 ## ST_GeomFromGeoJSON
 
+!!!note
+       This method is not recommended. Please use [Sedona GeoJSON data 
source](../../tutorial/sql.md#load-geojson-data) to read GeoJSON files.
+
 Introduction: Construct a Geometry from GeoJson
 
 Format: `ST_GeomFromGeoJSON (GeoJson: String)`
diff --git a/docs/api/sql/Function.md b/docs/api/sql/Function.md
index 7406fe133..814c9c376 100644
--- a/docs/api/sql/Function.md
+++ b/docs/api/sql/Function.md
@@ -344,6 +344,9 @@ POINT ZM(1 1 1 1)
 
 ## ST_AsGeoJSON
 
+!!!note
+       This method is not recommended. Please use [Sedona GeoJSON data 
source](../../tutorial/sql.md#save-as-geojson) to write GeoJSON files.
+
 Introduction: Return the [GeoJSON](https://geojson.org/) string representation 
of a geometry
 
 Format: `ST_AsGeoJSON (A: Geometry)`
diff --git a/docs/tutorial/rdd.md b/docs/tutorial/rdd.md
index 27cb25e54..65fe0c859 100644
--- a/docs/tutorial/rdd.md
+++ b/docs/tutorial/rdd.md
@@ -76,6 +76,9 @@ Use the following code to create a SpatialRDD
 
 #### From GeoJSON
 
+!!!note
+       Reading GeoJSON using SpatialRDD is not recommended. Please use [Sedona 
SQL and DataFrame API](sql.md#load-geojson-data) to read GeoJSON files.
+
 Geometries in GeoJSON is similar to WKT/WKB. However, a GeoJSON file must be 
beaked into multiple lines.
 
 Suppose we have a `polygon.json` GeoJSON file at Path `/Download/polygon.json` 
as follows:
@@ -200,7 +203,7 @@ var spatialRDD = Adapter.toSpatialRdd(spatialDf, "checkin")
 
 "checkin" is the name of the geometry column
 
-For WKT/WKB/GeoJSON data, please use ==ST_GeomFromWKT / ST_GeomFromWKB / 
ST_GeomFromGeoJSON== instead.
+For WKT/WKB data, please use ==ST_GeomFromWKT / ST_GeomFromWKB == instead.
 
 ## Transform the Coordinate Reference System
 
@@ -978,6 +981,9 @@ objectRDD.saveAsWKB("hdfs://PATH")
 
 #### Save to distributed GeoJSON text file
 
+!!!note
+       Saving GeoJSON using SpatialRDD is not recommended. Please use [Sedona 
SQL and DataFrame API](sql.md#save-as-geojson) to write GeoJSON files.
+
 Use the following code to save an SpatialRDD as a distributed GeoJSON text 
file:
 
 ```scala
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index 6d28998a2..f183a1270 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -292,48 +292,176 @@ root
 !!!note
        SedonaSQL provides lots of functions to create a Geometry column, 
please read [SedonaSQL constructor API](../api/sql/Constructor.md).
 
-## Load GeoJSON using Spark JSON Data Source
+## Load GeoJSON Data
+
+Since `v1.6.1`, Sedona supports reading GeoJSON files using the `geojson` data 
source. It is designed to handle JSON files that use [GeoJSON 
format](https://datatracker.ietf.org/doc/html/rfc7946) for their geometries.
+
+This includes SpatioTemporal Asset Catalog (STAC) files, GeoJSON features, 
GeoJSON feature collections and other variations.
+The key functionality lies in the way 'geometry' fields are processed: these 
are specifically read as Sedona's `GeometryUDT` type, ensuring integration with 
Sedona's suite of spatial functions.
+
+### Key features
+
+- Broad Support: The reader and writer are versatile, supporting all 
GeoJSON-formatted files, including STAC files, feature collections, and more.
+- Geometry Transformation: When reading, fields named 'geometry' are 
automatically converted from GeoJSON format to Sedona's `GeometryUDT` type and 
vice versa when writing.
+
+### Load MultiLine GeoJSON FeatureCollection
+
+Suppose we have a GeoJSON FeatureCollection file as follows.
+This entire file is considered as a single GeoJSON FeatureCollection object.
+Multiline format is preferable for scenarios where files need to be 
human-readable or manually edited.
+
+```json
+{ "type": "FeatureCollection",
+    "features": [
+      { "type": "Feature",
+        "geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
+        "properties": {"prop0": "value0"}
+        },
+      { "type": "Feature",
+        "geometry": {
+          "type": "LineString",
+          "coordinates": [
+            [102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
+            ]
+          },
+        "properties": {
+          "prop0": "value1",
+          "prop1": 0.0
+          }
+        },
+      { "type": "Feature",
+         "geometry": {
+           "type": "Polygon",
+           "coordinates": [
+             [ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
+               [100.0, 1.0], [100.0, 0.0] ]
+             ]
+         },
+         "properties": {
+           "prop0": "value2",
+           "prop1": {"this": "that"}
+           }
+         }
+       ]
+}
+```
+
+Set the `multiLine` option to `True` to read multiline GeoJSON files.
+
+=== "Python"
+
+    ```python
+    df = sedona.read.format("geojson").option("multiLine", 
"true").load("PATH/TO/MYFILE.json")
+     .selectExpr("explode(features) as features") # Explode the envelope to 
get one feature per row.
+     .select("features.*") # Unpack the features struct.
+     .withColumn("prop0", 
f.expr("properties['prop0']")).drop("properties").drop("type")
+
+    df.show()
+    df.printSchema()
+    ```
+
+=== "Scala"
+
+    ```scala
+    val df = sedona.read.format("geojson").option("multiLine", 
"true").load("PATH/TO/MYFILE.json")
+    val parsedDf = df.selectExpr("explode(features) as 
features").select("features.*")
+            .withColumn("prop0", 
expr("properties['prop0']")).drop("properties").drop("type")
+
+    parsedDf.show()
+    parsedDf.printSchema()
+    ```
+
+=== "Java"
+
+    ```java
+    Dataset<Row> df = sedona.read.format("geojson").option("multiLine", 
"true").load("PATH/TO/MYFILE.json")
+     .selectExpr("explode(features) as features") // Explode the envelope to 
get one feature per row.
+     .select("features.*") // Unpack the features struct.
+     .withColumn("prop0", 
expr("properties['prop0']")).drop("properties").drop("type")
+
+    df.show();
+    df.printSchema();
+    ```
+
+The output is as follows:
+
+```
++--------------------+------+
+|            geometry| prop0|
++--------------------+------+
+|     POINT (102 0.5)|value0|
+|LINESTRING (102 0...|value1|
+|POLYGON ((100 0, ...|value2|
++--------------------+------+
+
+root
+ |-- geometry: geometry (nullable = false)
+ |-- prop0: string (nullable = true)
+
+```
+
+### Load Single Line GeoJSON Features
+
+Suppose we have a single-line GeoJSON Features dataset as follows. Each line 
is a single GeoJSON Feature.
+This format is efficient for processing large datasets where each line is a 
separate, self-contained GeoJSON object.
+
+```json
+{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
+{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
+{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}
+```
 
-Spark SQL's built-in JSON data source supports reading GeoJSON data.
-To ensure proper parsing of the geometry property, we can define a schema with 
the geometry property set to type 'string'.
-This prevents Spark from interpreting the property and allows us to use the 
ST_GeomFromGeoJSON function for accurate geometry parsing.
+By default, when `option` is not specified, Sedona reads a GeoJSON file as a 
single line GeoJSON.
+
+=== "Python"
+
+       ```python
+       df = sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+          .withColumn("prop0", 
f.expr("properties['prop0']")).drop("properties").drop("type")
+
+       df.show()
+       df.printSchema()
+       ```
 
 === "Scala"
 
        ```scala
-       val schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>"
-       sedona.read.schema(schema).json(geojson_path)
-               .selectExpr("explode(features) as features") // Explode the 
envelope to get one feature per row.
-               .select("features.*") // Unpack the features struct.
-               .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)")) 
// Convert the geometry string.
-               .printSchema()
+       val df = sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+          .withColumn("prop0", 
expr("properties['prop0']")).drop("properties").drop("type")
+
+       df.show()
+       df.printSchema()
        ```
 
 === "Java"
 
        ```java
-       String schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>";
-       sedona.read.schema(schema).json(geojson_path)
-               .selectExpr("explode(features) as features") // Explode the 
envelope to get one feature per row.
-               .select("features.*") // Unpack the features struct.
-               .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)")) 
// Convert the geometry string.
-               .printSchema();
+       Dataset<Row> df = 
sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+          .withColumn("prop0", 
expr("properties['prop0']")).drop("properties").drop("type")
+
+       df.show()
+       df.printSchema()
        ```
 
-=== "Python"
+The output is as follows:
 
-       ```python
-       schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>";
-       (sedona.read.json(geojson_path, schema=schema)
-               .selectExpr("explode(features) as features") # Explode the 
envelope to get one feature per row.
-               .select("features.*") # Unpack the features struct.
-               .withColumn("geometry", f.expr("ST_GeomFromGeoJSON(geometry)")) 
# Convert the geometry string.
-               .printSchema())
-       ```
+```
++--------------------+------+
+|            geometry| prop0|
++--------------------+------+
+|     POINT (102 0.5)|value0|
+|LINESTRING (102 0...|value1|
+|POLYGON ((100 0, ...|value2|
++--------------------+------+
+
+root
+ |-- geometry: geometry (nullable = false)
+ |-- prop0: string (nullable = true)
+```
 
-## Load Shapefile and GeoJSON using SpatialRDD
+## Load Shapefile using SpatialRDD
 
-Shapefile and GeoJSON can be loaded by SpatialRDD and converted to DataFrame 
using Adapter. Please read [Load 
SpatialRDD](rdd.md#create-a-generic-spatialrdd) and [DataFrame <-> 
RDD](#convert-between-dataframe-and-spatialrdd).
+Shapefile can be loaded by SpatialRDD and converted to DataFrame using 
Adapter. Please read [Load SpatialRDD](rdd.md#create-a-generic-spatialrdd) and 
[DataFrame <-> RDD](#convert-between-dataframe-and-spatialrdd).
 
 ## Load GeoParquet
 
@@ -959,8 +1087,21 @@ SELECT ST_AsText(countyshape)
 FROM polygondf
 ```
 
-!!!note
-       ST_AsGeoJSON is also available. We would like to invite you to 
contribute more functions
+## Save as GeoJSON
+
+Since `v1.6.1`, the GeoJSON data source in Sedona can be used to save a 
Spatial DataFrame to a single-line JSON file, with geometries written in 
GeoJSON format.
+
+```sparksql
+df.write.format("geojson").save("YOUR/PATH.json")
+```
+
+The structure of the generated file will be like this:
+
+```json
+{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
+{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
+{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}
+```
 
 ## Save GeoParquet
 
@@ -996,9 +1137,9 @@ df.write.format("geoparquet")
 
 The value of `geoparquet.crs` and `geoparquet.crs.<column_name>` can be one of 
the following:
 
-* `"null"`: Explicitly setting `crs` field to `null`. This is the default 
behavior.
-* `""` (empty string): Omit the `crs` field. This implies that the CRS is 
[OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware 
implementations.
-* `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON 
object representing the Coordinate Reference System (CRS) of the geometry. You 
can find the PROJJSON string of a specific CRS from here: https://epsg.io/ 
(click the JSON option at the bottom of the page). You can also customize your 
PROJJSON string as needed.
+- `"null"`: Explicitly setting `crs` field to `null`. This is the default 
behavior.
+- `""` (empty string): Omit the `crs` field. This implies that the CRS is 
[OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware 
implementations.
+- `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON 
object representing the Coordinate Reference System (CRS) of the geometry. You 
can find the PROJJSON string of a specific CRS from here: https://epsg.io/ 
(click the JSON option at the bottom of the page). You can also customize your 
PROJJSON string as needed.
 
 Please note that Sedona currently cannot set/get a projjson string to/from a 
CRS. Its geoparquet reader will ignore the projjson metadata and you will have 
to set your CRS via [`ST_SetSRID`](../api/sql/Function.md#st_setsrid) after 
reading the file.
 Its geoparquet writer will not leverage the SRID field of a geometry so you 
will have to always set the `geoparquet.crs` option manually when writing the 
file, if you want to write a meaningful CRS field.
diff --git 
a/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 23e950440..a71664cc3 100644
--- 
a/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,2 @@
 org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
+org.apache.spark.sql.sedona_sql.io.geojson.GeoJSONFileFormat
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
new file mode 100644
index 000000000..894e68786
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.json._
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, RasterUDT}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * This is taken from 
[[org.apache.spark.sql.execution.datasources.json.JsonFileFormat]] with slight 
modification to
+ * support GeoJSON and read/write geometry values.
+ */
+class GeoJSONFileFormat extends TextBasedFileFormat with DataSourceRegister {
+  override val shortName: String = "geojson"
+
+  override def isSplitable(
+    sparkSession: SparkSession,
+    options: Map[String, String],
+    path: Path): Boolean = {
+    val parsedOptions = new JSONOptionsInRead(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    val jsonDataSource = JsonDataSource(parsedOptions)
+    jsonDataSource.isSplitable && super.isSplitable(sparkSession, options, 
path)
+  }
+
+  override def inferSchema(
+    sparkSession: SparkSession,
+    options: Map[String, String],
+    files: Seq[FileStatus]): Option[StructType] = {
+
+    val parsedOptions = new JSONOptionsInRead(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    // Use existing logic to infer the full schema first
+    val fullSchemaOption = JsonDataSource(parsedOptions).inferSchema(
+      sparkSession, files, parsedOptions)
+
+    fullSchemaOption.map { fullSchema =>
+      // Replace 'geometry' field type with GeometryUDT
+      val newFields = GeoJSONUtils.updateGeometrySchema(fullSchema, 
GeometryUDT)
+      StructType(newFields)
+    }
+  }
+
+  override def prepareWrite(
+    sparkSession: SparkSession,
+    job: Job,
+    options: Map[String, String],
+    dataSchema: StructType): OutputWriterFactory = {
+
+    val conf = job.getConfiguration
+    val parsedOptions = new JSONOptions(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+    val geometryColumnName = options.getOrElse("geometry.column", "geometry")
+    SparkCompatUtil.findNestedField(dataSchema, geometryColumnName.split('.'), 
resolver = SQLConf.get.resolver) match {
+      case Some(StructField(_, dataType, _, _)) =>
+        if (!dataType.acceptsType(GeometryUDT)) {
+          throw new IllegalArgumentException(s"$geometryColumnName is not a 
geometry column")
+        }
+      case None => throw new IllegalArgumentException(s"Column 
$geometryColumnName not found in the schema")
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+        path: String,
+        dataSchema: StructType,
+        context: TaskAttemptContext): OutputWriter = {
+        new GeoJSONOutputWriter(path, parsedOptions, dataSchema, 
geometryColumnName, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".json" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+
+  override def buildReader(
+    sparkSession: SparkSession,
+    dataSchema: StructType,
+    partitionSchema: StructType,
+    requiredSchema: StructType,
+    filters: Seq[Filter],
+    options: Map[String, String],
+    hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    val parsedOptions = new JSONOptionsInRead(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    val actualSchema =
+      StructType(requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
+    ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, 
parsedOptions.columnNameOfCorruptRecord)
+
+    val alteredSchema = GeoJSONUtils.updateGeometrySchema(actualSchema, 
StringType)
+
+    if (requiredSchema.length == 1 &&
+      requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+      throw new IllegalArgumentException(
+        "referenced columns only include the internal corrupt record column, 
this is not allowed")
+    }
+
+    (file: PartitionedFile) => {
+      val parser = SparkCompatUtil.constructJacksonParser(
+        alteredSchema,
+        parsedOptions,
+        allowArrayAsStructs = true)
+      val dataSource = JsonDataSource(parsedOptions)
+
+      dataSource.readFile(
+        broadcastedHadoopConf.value.value,
+        file,
+        parser,
+        actualSchema).map(row => {
+        val newRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema)
+        newRow
+      })
+    }
+  }
+
+
+  override def toString: String = "GEOJSON"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = 
other.isInstanceOf[GeoJSONFileFormat]
+
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
+
+    case _: AtomicType => true
+
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
+
+    case ArrayType(elementType, _) => supportDataType(elementType)
+
+    case MapType(keyType, valueType, _) =>
+      supportDataType(keyType) && supportDataType(valueType)
+
+    case GeometryUDT => true
+    case RasterUDT => false
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
+
+    case _: NullType => true
+
+    case _ => false
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
new file mode 100644
index 000000000..1c8a84926
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import com.fasterxml.jackson.core._
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import 
org.apache.spark.sql.sedona_sql.io.geojson.GeoJSONUtils.geometryToGeoJson
+import org.apache.spark.sql.types._
+import org.locationtech.jts.io.WKTWriter
+
+import java.io.Writer
+
+/**
+ * `GeoJSONJacksonGenerator` is taken from [[JacksonGenerator]] with slight 
modifications to write rows as
+ * GeoJSON Feature objects. It handles geometry columns properly and wrap all 
other columns in `properties` column.
+ */
+private[sql] class GeoJSONJacksonGenerator(
+  schema: StructType,
+  geometryColumnName: String,
+  writer: Writer,
+  options: JSONOptions) {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
appropriate
+  // JSON data. Here we are using `SpecializedGetters` rather than 
`InternalRow` so that
+  // we can directly access data in `ArrayData` without the help of 
`SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // `ValueWriter`s for all fields of the schema
+  private lazy val rootFieldWriters: Array[ValueWriter] =
+    schema.map(field => makeWriter(field.dataType, field.name)).toArray
+
+  // If there is a field named `properties` and it is a struct or map, we will 
write all fields to top-level
+  // of the GeoJSON, otherwise we'll wrap all fields in `properties` field.
+  private val hasPropertiesField = schema.fields.exists { field =>
+    field.name == "properties" && (field.dataType.isInstanceOf[StructType] || 
field.dataType.isInstanceOf[MapType])
+  }
+
+  private val gen = {
+    val generator = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+    if (options.pretty) {
+      generator.setPrettyPrinter(new DefaultPrettyPrinter(""))
+    }
+    generator
+  }
+
+  private val lineSeparator: String = options.lineSeparatorInWrite
+
+  private val timestampFormatter = SparkCompatUtil.constructTimestampFormatter(
+    options,
+    options.zoneId,
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT,
+    isParsing = false)
+  private val dateFormatter = SparkCompatUtil.constructDateFormatter(
+    options,
+    options.zoneId,
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT,
+    isParsing = false)
+
+  private val wktWriter = new WKTWriter()
+  private var currentGeoJsonString: Option[String] = None
+
+  private def makeWriter(dataType: DataType, name: String): ValueWriter = 
dataType match {
+    case NullType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNull()
+
+    case BooleanType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBoolean(row.getBoolean(ordinal))
+
+    case ByteType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getByte(ordinal))
+
+    case ShortType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getShort(ordinal))
+
+    case IntegerType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getInt(ordinal))
+
+    case LongType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getLong(ordinal))
+
+    case FloatType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getFloat(ordinal))
+
+    case DoubleType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDouble(ordinal))
+
+    case StringType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeString(row.getUTF8String(ordinal).toString)
+
+    case TimestampType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val timestampString = timestampFormatter.format(row.getLong(ordinal))
+        gen.writeString(timestampString)
+
+    case DateType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val dateString = dateFormatter.format(row.getInt(ordinal))
+        gen.writeString(dateString)
+
+    case CalendarIntervalType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeString(row.getInterval(ordinal).toString)
+
+    case BinaryType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBinary(row.getBinary(ordinal))
+
+    case dt: DecimalType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDecimal(ordinal, dt.precision, 
dt.scale).toJavaBigDecimal)
+
+    case st: StructType =>
+      val fieldWriters = st.map(field => makeWriter(field.dataType, name + "." 
+ field.name))
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeFields(name, row.getStruct(ordinal, st.length), st, 
fieldWriters))
+
+    case at: ArrayType =>
+      val elementWriter = makeWriter(at.elementType, name)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+    case mt: MapType =>
+      val valueWriter = makeWriter(mt.valueType, name)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+    case GeometryUDT =>
+      // We'll only write non-primary geometry columns here, we'll write it as 
WKT to properties object.
+      (row: SpecializedGetters, ordinal: Int) => {
+        val geom = GeometryUDT.deserialize(row.getBinary(ordinal))
+        val wkt = wktWriter.write(geom)
+        gen.writeString(wkt)
+      }
+
+    // For UDT values, they should be in the SQL type's corresponding value 
type.
+    // We should not see values in the user-defined class at here.
+    // For example, VectorUDT's SQL type is an array of double. So, we should 
expect that v is
+    // an ArrayData at here, instead of a Vector.
+    case t: UserDefinedType[_] =>
+      makeWriter(t.sqlType, name)
+
+    case _ =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val v = row.get(ordinal, dataType)
+        throw new IllegalArgumentException(s"Unsupported dataType: 
${dataType.catalogString} " +
+          s"with value $v")
+  }
+
+  private def writeObject(f: => Unit): Unit = {
+    gen.writeStartObject()
+    f
+    gen.writeEndObject()
+  }
+
+  private def writeFields(name: String, row: InternalRow, schema: StructType, 
fieldWriters: Seq[ValueWriter]): Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      val field = schema(i)
+      val fullFieldName = if (name.isEmpty) field.name else 
s"$name.${field.name}"
+      if (fullFieldName == geometryColumnName) {
+        // Primary geometry field, don't need to write out the key. We save 
the geometry and will write it out as
+        // geometry field after we've done with writing the properties field.
+        if (!row.isNullAt(i)) {
+          val geoJson = geometryToGeoJson(row.getBinary(i))
+          currentGeoJsonString = Some(geoJson)
+        } else {
+          currentGeoJsonString = None
+        }
+      } else {
+        // properties field, simply write it out
+        if (hasPropertiesField && (fullFieldName == "type" || fullFieldName == 
"geometry")) {
+          // Ignore top-level type and geometry fields
+        } else {
+          if (!row.isNullAt(i)) {
+            gen.writeFieldName(field.name)
+            fieldWriters(i).apply(row, i)
+          } else if (!options.ignoreNullFields) {
+            gen.writeFieldName(field.name)
+            gen.writeNull()
+          }
+        }
+      }
+      i += 1
+    }
+  }
+
+  private def writeArray(f: => Unit): Unit = {
+    gen.writeStartArray()
+    f
+    gen.writeEndArray()
+  }
+
+  private def writeArrayData(
+    array: ArrayData, fieldWriter: ValueWriter): Unit = {
+    var i = 0
+    while (i < array.numElements()) {
+      if (!array.isNullAt(i)) {
+        fieldWriter.apply(array, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  private def writeMapData(
+    map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+    val keyArray = map.keyArray()
+    val valueArray = map.valueArray()
+    var i = 0
+    while (i < map.numElements()) {
+      gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+      if (!valueArray.isNullAt(i)) {
+        fieldWriter.apply(valueArray, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  def close(): Unit = gen.close()
+
+  /**
+   * Transforms a single `InternalRow` to JSON object using Jackson.
+   * This api calling will be validated through accessing `rootFieldWriters`.
+   *
+   * @param row The row to convert
+   */
+  def write(row: InternalRow): Unit = {
+    currentGeoJsonString = None
+
+    gen.writeStartObject()
+    gen.writeStringField("type", "Feature")
+
+    if (hasPropertiesField) {
+      // Write all fields as top-level fields
+      writeFields(
+        name = "",
+        fieldWriters = rootFieldWriters,
+        row = row,
+        schema = schema)
+    } else {
+      // Wrap all fields in `properties` field
+      gen.writeFieldName("properties")
+      gen.writeStartObject()
+      writeFields(
+        name = "",
+        fieldWriters = rootFieldWriters,
+        row = row,
+        schema = schema)
+      gen.writeEndObject()
+    }
+
+    // write geometry
+    gen.writeFieldName("geometry")
+    currentGeoJsonString match {
+      case Some(geoJson) => gen.writeRawValue(geoJson)
+      case None => gen.writeNull()
+    }
+
+    // end of feature
+    gen.writeEndObject()
+  }
+
+  def writeLineEnding(): Unit = {
+    // Note that JSON uses writer with UTF-8 charset. This string will be 
written out as UTF-8.
+    gen.writeRaw(lineSeparator)
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
new file mode 100644
index 000000000..58239b12e
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+import java.nio.charset.{Charset, StandardCharsets}
+
+/**
+ * This is taken from 
[[org.apache.spark.sql.execution.datasources.json.JsonOutputWriter]] with slight
+ * modifications to write GeoJSON output.
+ */
+class GeoJSONOutputWriter(
+  val path: String,
+  options: JSONOptions,
+  dataSchema: StructType,
+  geometryColumnName: String,
+  context: TaskAttemptContext)
+  extends OutputWriter with Logging {
+
+  private val encoding = options.encoding match {
+    case Some(charsetName) => Charset.forName(charsetName)
+    case None => StandardCharsets.UTF_8
+  }
+
+  private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path), encoding)
+
+  // create the Generator without separator inserted between 2 records
+  private[this] val gen = new GeoJSONJacksonGenerator(dataSchema, 
geometryColumnName, writer, options)
+
+  override def write(row: InternalRow): Unit = {
+    gen.write(row)
+    gen.writeLineEnding()
+  }
+
+  override def close(): Unit = {
+    gen.close()
+    writer.close()
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
new file mode 100644
index 000000000..6ab7ba7c8
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.sedona.common.Constructors.geomFromText
+import org.apache.sedona.common.Functions.asGeoJson
+import org.apache.sedona.common.enums.FileDataSplitter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+object GeoJSONUtils {
+
+  def updateGeometrySchema(schema: StructType, datatype: DataType): StructType 
= {
+    StructType(schema.fields.map {
+      case StructField("geometry", _, nullable, metadata) =>
+        StructField("geometry", datatype, nullable, metadata)
+      case StructField(name, dataType: StructType, nullable, metadata) =>
+        StructField(name, updateGeometrySchema(dataType, datatype), nullable, 
metadata)
+      case StructField(name, ArrayType(elementType: StructType, containsNull), 
nullable, metadata) =>
+        val updatedElementType = updateGeometrySchema(elementType, datatype)
+        StructField(name, ArrayType(updatedElementType, containsNull), 
nullable, metadata)
+      case other => other
+    })
+  }
+
+  def geoJsonToGeometry(geoJson: String): Array[Byte] = {
+    val geometry = geomFromText(geoJson, FileDataSplitter.GEOJSON)
+    GeometryUDT.serialize(geometry)
+  }
+
+  def geometryToGeoJson(geometryBinary: Array[Byte]): String = {
+    val geometry = GeometryUDT.deserialize(geometryBinary)
+    asGeoJson(geometry)
+  }
+
+  def handleArray(row: InternalRow, index: Int, elementType: DataType, 
toGeometry: Boolean): ArrayData = {
+    val arrayData = row.getArray(index)
+    if (arrayData == null || arrayData.numElements() == 0) return new 
GenericArrayData(Seq.empty[Any])
+
+    elementType match {
+      case structType: StructType =>
+        val convertedArray = (0 until arrayData.numElements()).map { i =>
+          if (!arrayData.isNullAt(i)) {
+            val innerRow = arrayData.getStruct(i, structType.fields.length)
+            if (toGeometry) {
+              convertGeoJsonToGeometry(innerRow, structType)
+            } else {
+              convertGeometryToGeoJson(innerRow, structType)
+            }
+          } else {
+            null
+          }
+        }
+        new GenericArrayData(convertedArray)
+      case _ => arrayData
+    }
+  }
+
+  def convertGeometryToGeoJson(row: InternalRow, schema: StructType): 
InternalRow = {
+    val newValues = new Array[Any](schema.fields.length)
+
+    schema.fields.zipWithIndex.foreach {
+      case (StructField("geometry", _: GeometryUDT, _, _), index) =>
+        val geometryBinary = row.getBinary(index)
+        newValues(index) = 
UTF8String.fromString(geometryToGeoJson(geometryBinary))
+      case (StructField(_, structType: StructType, _, _), index) =>
+        val nestedRow = row.getStruct(index, structType.fields.length)
+        newValues(index) = convertGeometryToGeoJson(nestedRow, structType)
+      case (StructField(_, arrayType: ArrayType, _, _), index) =>
+        newValues(index) = handleArray(row, index, arrayType.elementType, 
false)
+      case (_, index) =>
+        newValues(index) = row.get(index, schema.fields(index).dataType)
+    }
+
+    InternalRow.fromSeq(newValues)
+  }
+
+
+  def convertGeoJsonToGeometry(row: InternalRow, schema: StructType): 
InternalRow = {
+    val newValues = new Array[Any](schema.fields.length)
+
+    schema.fields.zipWithIndex.foreach {
+      case (StructField("geometry", StringType, _, _), index) =>
+        val geometryGeoJson = row.getString(index)
+        newValues(index) = geoJsonToGeometry(geometryGeoJson)
+      case (StructField(_, structType: StructType, _, _), index) =>
+        val nestedRow = row.getStruct(index, structType.fields.length)
+        newValues(index) = convertGeoJsonToGeometry(nestedRow, structType)
+      case (StructField(_, arrayType: ArrayType, _, _), index) =>
+        newValues(index) = handleArray(row, index, arrayType.elementType, true)
+      case (_, index) =>
+        newValues(index) = row.get(index, schema.fields(index).dataType)
+    }
+
+    InternalRow.fromSeq(newValues)
+  }
+
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
new file mode 100644
index 000000000..3ce1df7d7
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LegacyDateFormat
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+
+import scala.reflect.runtime.{universe => ru}
+import java.time.ZoneId
+import java.util.Locale
+
+/**
+ * Making GeoJSONFileFormat and GeoJSONOutputWriter classes compatible with 
Spark 3.0.x.
+ */
+object SparkCompatUtil {
+  // Here we are defining our own findNestedField method instead of using 
StructType.findNestedField method for
+  // compatibility with Spark 3.0.x
+  def findNestedField(schema: StructType, path: Array[String], resolver: 
Resolver): Option[StructField] = {
+    path match {
+      case Array(part) =>
+        schema.find(f => resolver(f.name, part))
+      case Array(head, tail @ _*) =>
+        schema.find(f => resolver(f.name, head)).flatMap { f =>
+          f.dataType match {
+            case st: StructType => findNestedField(st, tail.toArray, resolver)
+            case _ => None
+          }
+        }
+      case _ => None
+    }
+  }
+
+  def constructTimestampFormatter(
+    options: JSONOptions,
+    zoneId: ZoneId,
+    locale: Locale,
+    legacyFormat: LegacyDateFormat,
+    isParsing: Boolean
+  ): TimestampFormatter = {
+    val mirror = ru.runtimeMirror(getClass.getClassLoader)
+    val timestampFormatTerm = 
ru.typeOf[JSONOptions].decl(ru.TermName("timestampFormat"))
+    val timestampFormatInWriteTerm = 
ru.typeOf[JSONOptions].decl(ru.TermName("timestampFormatInWrite"))
+    val instanceMirror = mirror.reflect(options)
+    val format = if (timestampFormatTerm.isMethod) {
+      
instanceMirror.reflectMethod(timestampFormatTerm.asMethod).apply().asInstanceOf[String]
+    } else if (timestampFormatInWriteTerm.isMethod) {
+      
instanceMirror.reflectMethod(timestampFormatInWriteTerm.asMethod).apply().asInstanceOf[String]
+    } else {
+      throw new Exception("Neither timestampFormat nor timestampFormatInWrite 
found in JSONOptions")
+    }
+    TimestampFormatter(
+      format,
+      zoneId,
+      locale,
+      legacyFormat,
+      isParsing)
+  }
+
+  def constructDateFormatter(
+    options: JSONOptions,
+    zoneId: ZoneId,
+    locale: Locale,
+    legacyFormat: LegacyDateFormat,
+    isParsing: Boolean): DateFormatter = {
+    val mirror = ru.runtimeMirror(getClass.getClassLoader)
+
+    val dateFormatTerm = ru.typeOf[JSONOptions].decl(ru.TermName("dateFormat"))
+    val dateFormatInWriteTerm = 
ru.typeOf[JSONOptions].decl(ru.TermName("dateFormatInWrite"))
+    val instanceMirror = mirror.reflect(options)
+    val format = if (dateFormatTerm.isMethod) {
+      
instanceMirror.reflectMethod(dateFormatTerm.asMethod).apply().asInstanceOf[String]
+    } else if (dateFormatInWriteTerm.isMethod) {
+      
instanceMirror.reflectMethod(dateFormatInWriteTerm.asMethod).apply().asInstanceOf[String]
+    } else {
+      throw new Exception("Neither dateFormat nor dateFormatInWrite found in 
JSONOptions")
+    }
+
+    val dateFormatterClass = 
mirror.staticClass("org.apache.spark.sql.catalyst.util.DateFormatter$")
+    val dateFormatterModule = mirror.staticModule(dateFormatterClass.fullName)
+    val dateFormatterInstance = mirror.reflectModule(dateFormatterModule)
+    val applyMethods = 
dateFormatterClass.toType.members.filter(_.name.decodedName.toString == "apply")
+    applyMethods.find(_.typeSignature.paramLists.flatten.size == 5) match {
+      case Some(applyMethod) =>
+        
mirror.reflect(dateFormatterInstance.instance).reflectMethod(applyMethod.asMethod)(
+          format, zoneId, locale, legacyFormat, isParsing
+        ).asInstanceOf[DateFormatter]
+      case None =>
+        applyMethods.find { method =>
+          val params = method.typeSignature.paramLists.flatten
+          params.size == 4 &&
+            // get rid of the variant taking Option[String] as first parameter
+            params.head.typeSignature <:< ru.typeOf[String]
+        } match {
+          case Some(applyMethod) =>
+            
mirror.reflect(dateFormatterInstance.instance).reflectMethod(applyMethod.asMethod)(
+              format, locale, legacyFormat, isParsing
+            ).asInstanceOf[DateFormatter]
+          case None =>
+            throw new Exception("No suitable apply method found in 
DateFormatter")
+        }
+    }
+  }
+
+  def constructJacksonParser(
+    schema: DataType,
+    options: JSONOptions,
+    allowArrayAsStructs: Boolean
+  ): JacksonParser = {
+    val mirror = ru.runtimeMirror(getClass.getClassLoader)
+    val jacksonParserClass = 
mirror.staticClass("org.apache.spark.sql.catalyst.json.JacksonParser")
+
+    val constructorMethods = 
jacksonParserClass.toType.members.filter(_.isConstructor)
+
+    constructorMethods.find(_.typeSignature.paramLists.flatten.size == 3) 
match {
+      case Some(constructorMethod) =>
+        
mirror.reflectClass(jacksonParserClass).reflectConstructor(constructorMethod.asMethod)(
+          schema, options, allowArrayAsStructs
+        ).asInstanceOf[JacksonParser]
+      case None =>
+        constructorMethods.find(_.typeSignature.paramLists.flatten.size == 4) 
match {
+          case Some(constructorMethod) =>
+            
mirror.reflectClass(jacksonParserClass).reflectConstructor(constructorMethod.asMethod)(
+              schema, options, allowArrayAsStructs, Seq.empty
+            ).asInstanceOf[JacksonParser]
+          case None =>
+            throw new Exception("No suitable constructor found in 
JacksonParser")
+        }
+    }
+  }
+}
diff --git a/spark/common/src/test/resources/geojson/core-item.json 
b/spark/common/src/test/resources/geojson/core-item.json
new file mode 100644
index 000000000..b9bdbb014
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/core-item.json
@@ -0,0 +1 @@
+{"stac_version": "1.0.0","stac_extensions": [],"type": "Feature","id": 
"20201211_223832_CS2","bbox": 
[172.91173669923782,1.3438851951615003,172.95469614953714,1.3690476620161975],"geometry":
 {"type": "Polygon","coordinates": 
[[[172.91173669923782,1.3438851951615003],[172.95469614953714,1.3438851951615003],[172.95469614953714,1.3690476620161975],[172.91173669923782,1.3690476620161975],[172.91173669923782,1.3438851951615003]]]},"properties":
 {"title": "Core Item","description": "A sample S [...]
diff --git 
a/spark/common/src/test/resources/geojson/geojson_feature-collection.json 
b/spark/common/src/test/resources/geojson/geojson_feature-collection.json
new file mode 100644
index 000000000..08803b13c
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/geojson_feature-collection.json
@@ -0,0 +1,76 @@
+{
+  "type": "FeatureCollection",
+  "features": [
+    { "type": "Feature",
+      "geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
+      "properties": {"prop0": "value0"}
+    },
+    { "type": "Feature",
+      "geometry": {
+        "type": "LineString",
+        "coordinates": [
+          [102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
+        ]
+      },
+      "properties": {
+        "prop0": "value1",
+        "prop1": 0.0
+      }
+    },
+    { "type": "Feature",
+      "geometry": {
+        "type": "Polygon",
+        "coordinates": [
+          [ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
+            [100.0, 1.0], [100.0, 0.0] ]
+        ]
+      },
+      "properties": {
+        "prop0": "value2",
+        "prop1": {"this": "that"}
+      }
+    },
+    {
+      "type": "Feature",
+      "geometry": {
+        "type": "MultiLineString",
+        "coordinates": [
+          [[170.0, 45.0], [180.0, 45.0]],
+          [[-180.0, 45.0], [-170.0, 45.0]]
+        ]
+      },
+      "properties": {
+        "prop0": "value3",
+        "prop1": {"this": "that"}
+      }
+    },
+    {
+      "type": "Feature",
+      "geometry": {
+        "type": "MultiPolygon",
+        "coordinates": [
+          [[[180.0, 40.0], [180.0, 50.0], [170.0, 50.0], [170.0, 40.0], 
[180.0, 40.0]]],
+          [[[-170.0, 40.0], [-170.0, 50.0], [-180.0, 50.0], [-180.0, 40.0], 
[-170.0, 40.0]]]
+        ]
+      },
+      "properties": {
+        "prop0": "value4",
+        "prop1": {"this": "that"}
+      }
+    },
+    {
+      "type": "Feature",
+      "geometry": {
+        "type": "MultiPoint",
+        "coordinates": [
+          [100.0, 0.0],
+          [101.0, 1.0]
+        ]
+      },
+      "properties": {
+        "prop0": "value5",
+        "prop1": {"this": "that"}
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/spark/common/src/test/resources/geojson/test1.json 
b/spark/common/src/test/resources/geojson/test1.json
new file mode 100644
index 000000000..ed8a71dc5
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test1.json
@@ -0,0 +1,39 @@
+{
+  "type": "Feature",
+  "geometry": {
+    "type": "Polygon",
+    "coordinates": [
+      [
+        [
+          172.91173669923782,
+          1.3438851951615003
+        ],
+        [
+          172.95469614953714,
+          1.3438851951615003
+        ],
+        [
+          172.95469614953714,
+          1.3690476620161975
+        ],
+        [
+          172.91173669923782,
+          1.3690476620161975
+        ],
+        [
+          172.91173669923782,
+          1.3438851951615003
+        ]
+      ]
+    ]
+  },
+  "properties": {
+    "title": "Core Item",
+    "description": "A sample STAC Item that includes examples of all common 
metadata",
+    "start_datetime": "2020-12-11T22:38:32.125Z",
+    "end_datetime": "2020-12-11T22:38:32.327Z",
+    "created": "2020-12-12T01:48:13.725Z",
+    "gsd": 0.512
+  },
+  "assets": []
+}
diff --git a/spark/common/src/test/resources/geojson/test12.json 
b/spark/common/src/test/resources/geojson/test12.json
new file mode 100644
index 000000000..6b75c89a2
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test12.json
@@ -0,0 +1,41 @@
+{
+  "type": "Feature",
+  "geometry": {
+    "type": "Polygon",
+    "coordinates": [
+      [
+        [
+          172.91173669923782,
+          1.3438851951615003
+        ],
+        [
+          172.95469614953714,
+          1.3438851951615003
+        ],
+        [
+          172.95469614953714,
+          1.3690476620161975
+        ],
+        [
+          172.91173669923782,
+          1.3690476620161975
+        ],
+        [
+          172.91173669923782,
+          1.3438851951615003
+        ]
+      ]
+    ]
+  },
+  "properties": {
+    "title": "Core Item",
+    "description": "A sample STAC Item that includes examples of all common 
metadata",
+    "start_datetime": "2020-12-11T22:38:32.125Z",
+    "end_datetime": "2020-12-11T22:38:32.327Z",
+    "created": "2020-12-12T01:48:13.725Z",
+    "gsd": 0.512
+  },
+  "assets": {
+    "proj": [[]]
+  }
+}
diff --git a/spark/common/src/test/resources/geojson/test2.json 
b/spark/common/src/test/resources/geojson/test2.json
new file mode 100644
index 000000000..f2a2dec4b
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test2.json
@@ -0,0 +1,21 @@
+{
+  "type": "Feature",
+  "geometry": {
+    "type": "MultiLineString",
+    "coordinates": [
+      [
+        [170.0, 45.0], [180.0, 45.0]
+      ], [
+        [-180.0, 45.0], [-170.0, 45.0]
+      ]
+    ]
+  },
+  "properties": {
+    "title": "Core Item",
+    "description": "A sample STAC Item that includes examples of all common 
metadata",
+    "start_datetime": "2020-12-11T22:38:32.125Z",
+    "end_datetime": "2020-12-11T22:38:32.327Z",
+    "created": "2020-12-12T01:48:13.725Z",
+    "gsd": 0.512
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
new file mode 100644
index 000000000..a1e982fcd
--- /dev/null
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.functions.{col, explode, expr}
+import org.locationtech.jts.geom.{Geometry, MultiLineString, Point, Polygon}
+import org.scalatest.BeforeAndAfterAll
+
+import java.io.File
+
+class geojsonIOTests extends TestBaseScala with BeforeAndAfterAll {
+  val geojsondatalocation1: String = resourceFolder + "geojson/test1*"
+  val geojsondatalocation2: String = resourceFolder + 
"geojson/geojson_feature-collection.json"
+  val geojsondatalocation3: String = resourceFolder + "geojson/core-item.json"
+  val geojsondatalocation4: String = resourceFolder + "geojson/test2.json"
+  val geojsonoutputlocation: String = resourceFolder + 
"geojson/geojson_output/"
+
+  override def afterAll(): Unit = FileUtils.deleteDirectory(new 
File(geojsonoutputlocation))
+
+  describe("GeoJSON IO tests") {
+    it("GeoJSON Test - Simple DataFrame writing and reading") {
+      val df = sparkSession.range(0, 10).toDF("id")
+        .withColumn("geometry", expr("ST_Point(id, id)"))
+        .withColumn("text", expr("concat('test', id)"))
+      
df.write.format("geojson").mode(SaveMode.Overwrite).save(geojsonoutputlocation 
+ "/geojson_write.json")
+
+      // Read the GeoJSON back using JSON reader
+      val schema = "type string, geometry string, properties struct<id:int, 
text:string>"
+      val dfJson = 
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation + 
"/geojson_write.json")
+      dfJson.collect().foreach { row =>
+        
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+        
assert(row.getAs[GenericRowWithSchema]("properties").getAs("text").toString.startsWith("test"))
+      }
+
+      // Read the GeoJSON back using the GeoJSON reader
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      dfW.collect().foreach { row =>
+        val geom = row.getAs[Geometry]("geometry")
+        assert(geom.isInstanceOf[Point])
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        properties.getAs("text").toString.startsWith("test")
+      }
+    }
+
+    it("GeoJSON Test - Specifying geometry column other than geometry") {
+      val df = sparkSession.range(0, 10).toDF("id")
+        .withColumn("point", expr("ST_Point(id, id)"))
+        .withColumn("geom", expr("ST_MakeLine(ST_Point(id, id), ST_Point(id, 
id + 1))"))
+        .withColumn("text", expr("concat('test', id)"))
+      df.write.format("geojson").option("geometry.column", 
"point").mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      // Read the GeoJSON back using JSON reader
+      val schema = "type string, geometry string, properties struct<id:int, 
text:string, geom:string>"
+      val dfJson = 
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation + 
"/geojson_write.json")
+      dfJson.collect().foreach { row =>
+        
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        assert(properties.getAs("text").toString.startsWith("test"))
+        assert(properties.getAs("geom").toString.startsWith("LINESTRING"))
+      }
+
+      // Read the GeoJSON back using the GeoJSON reader
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      dfW.collect().foreach { row =>
+        val geom = row.getAs[Geometry]("geometry")
+        assert(geom.isInstanceOf[Point])
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        properties.getAs("text").toString.startsWith("test")
+        properties.getAs("geom").toString.startsWith("LINESTRING")
+      }
+    }
+
+    it("GeoJSON Test - Specifying geometry column in a nested struct column") {
+      val df = sparkSession.range(0, 10).toDF("id")
+        .withColumn("text_outer", expr("concat('test_outer', id)"))
+        .withColumn("nested", expr("struct(id, concat('test_inner', id) AS 
text_inner, ST_Point(id, id) AS geom)"))
+      df.write.format("geojson").option("geometry.column", 
"nested.geom").mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      // Read the GeoJSON back using JSON reader
+      val schema = "type string, geometry string, properties 
struct<text_outer:string, nested:struct<id:int, text_inner:string>>"
+      val dfJson = 
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation + 
"/geojson_write.json")
+      dfJson.collect().foreach { row =>
+        
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        
assert(properties.getAs("text_outer").toString.startsWith("test_outer"))
+        val nested = properties.getAs[GenericRowWithSchema]("nested")
+        assert(nested.getAs("text_inner").toString.startsWith("test_inner"))
+      }
+
+      // Read the GeoJSON back using the GeoJSON reader
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      dfW.collect().foreach { row =>
+        val geom = row.getAs[Geometry]("geometry")
+        assert(geom.isInstanceOf[Point])
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        properties.getAs("text_outer").toString.startsWith("test_outer")
+        val nested = properties.getAs[GenericRowWithSchema]("nested")
+        assert(nested.getAs("text_inner").toString.startsWith("test_inner"))
+      }
+    }
+
+    it("GeoJSON Test - DataFrame containing properties column") {
+      val df = sparkSession.range(0, 10).toDF("id")
+        .withColumn("point", expr("ST_Point(id, id)"))
+        .withColumn("test_outer", expr("concat('test_outer', id)"))
+        .withColumn("properties", expr("struct(id, concat('test', id) AS 
text)"))
+      df.write.format("geojson").option("geometry.column", 
"point").mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      // Read the GeoJSON back using JSON reader
+      val schema = "type string, geometry string, test_outer string, 
properties struct<id:int, text:string>"
+      val dfJson = 
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation + 
"/geojson_write.json")
+      dfJson.collect().foreach { row =>
+        
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+        assert(row.getAs[String]("test_outer").startsWith("test_outer"))
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        assert(properties.getAs("text").toString.startsWith("test"))
+      }
+
+      // Read the GeoJSON back using the GeoJSON reader
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      dfW.collect().foreach { row =>
+        val geom = row.getAs[Geometry]("geometry")
+        assert(geom.isInstanceOf[Point])
+        assert(row.getAs[String]("test_outer").startsWith("test_outer"))
+        val properties = row.getAs[GenericRowWithSchema]("properties")
+        assert(properties.getAs("text").toString.startsWith("test"))
+      }
+    }
+
+    it("GeoJSON Test - Read and Write multiline GeoJSON file") {
+      val dfR = sparkSession.read.format("geojson").option("multiLine", 
true).load(geojsondatalocation1)
+      val rowsR = dfR.collect()(0)
+
+      assert((rowsR.getAs[GenericRowWithSchema]("assets") != null) == true)
+      assert(rowsR.getAs[String]("type") == "Feature")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(0) == 
"2020-12-12T01:48:13.725Z")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(1) == 
"A sample STAC Item that includes examples of all common metadata")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(5) == 
"Core Item")
+      assert(rowsR.getAs[Polygon]("geometry").toString == "POLYGON 
((172.91173669923782 1.3438851951615003, 172.95469614953714 1.3438851951615003, 
172.95469614953714 1.3690476620161975, 172.91173669923782 1.3690476620161975, 
172.91173669923782 1.3438851951615003))")
+
+      dfR.write.format("geojson").option("multiLine", 
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      val rowsW = dfW.collect()(0)
+      assert(rowsR.getAs[GenericRowWithSchema]("properties") == 
rowsW.getAs[GenericRowWithSchema]("properties"))
+      assert(rowsR.getAs[Polygon]("geometry") == 
rowsW.getAs[Polygon]("geometry"))
+      assert(rowsR.getAs[String]("type") == rowsW.getAs[String]("type"))
+    }
+    it("GeoJSON Test - Read and Write MultilineString geometry") {
+      val dfR = sparkSession.read.format("geojson").option("multiLine", 
true).load(geojsondatalocation4)
+      val rowsR = dfR.collect()(0)
+      assert(rowsR.getAs[String]("type") == "Feature")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(0) == 
"2020-12-12T01:48:13.725Z")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(1) == 
"A sample STAC Item that includes examples of all common metadata")
+      assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(5) == 
"Core Item")
+      assert(rowsR.getAs[MultiLineString]("geometry").toString == 
"MULTILINESTRING ((170 45, 180 45), (-180 45, -170 45))")
+
+      dfR.write.format("geojson").option("multiLine", 
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      val rowsW = dfW.collect()(0)
+      assert(rowsR.getAs[GenericRowWithSchema]("properties") == 
rowsW.getAs[GenericRowWithSchema]("properties"))
+      assert(rowsR.getAs[MultiLineString]("geometry") == 
rowsW.getAs[MultiLineString]("geometry"))
+      assert(rowsR.getAs[String]("type") == rowsW.getAs[String]("type"))
+    }
+    it("GeoJSON Test - feature collection test") {
+      val dfR = sparkSession.read.format("geojson").option("multiLine", 
true).load(geojsondatalocation2)
+      val rowsR = dfR.collect()(0)
+
+      assert(rowsR.getAs[Seq[Row]]("features")(0).get(0).toString == "POINT 
(102 0.5)")
+      assert(rowsR.getAs[Seq[Row]]("features")(1).get(0).toString == 
"LINESTRING (102 0, 103 1, 104 0, 105 1)")
+      assert(rowsR.getAs[Seq[Row]]("features")(2).get(0).toString == "POLYGON 
((100 0, 101 0, 101 1, 100 1, 100 0))")
+      assert(rowsR.getAs[Seq[Row]]("features")(3).get(0).toString == 
"MULTILINESTRING ((170 45, 180 45), (-180 45, -170 45))")
+      assert(rowsR.getAs[Seq[Row]]("features")(4).get(0).toString == 
"MULTIPOLYGON (((180 40, 180 50, 170 50, 170 40, 180 40)), ((-170 40, -170 50, 
-180 50, -180 40, -170 40)))")
+
+      val df = 
dfR.select(explode(col("features")).alias("feature")).selectExpr("feature.*")
+      df.write.format("geojson").option("multiLine", 
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation + 
"/geojson_write.json")
+
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+      val rowsW = dfW.collect()
+      assert(rowsW.length == 6)
+      assert(rowsW(0).getAs("geometry").toString == "POINT (102 0.5)")
+      assert(rowsW(1).getAs("geometry").toString == "LINESTRING (102 0, 103 1, 
104 0, 105 1)")
+      assert(rowsW(2).getAs("geometry").toString == "POLYGON ((100 0, 101 0, 
101 1, 100 1, 100 0))")
+      assert(rowsW(3).getAs("geometry").toString == "MULTILINESTRING ((170 45, 
180 45), (-180 45, -170 45))")
+      assert(rowsW(4).getAs("geometry").toString == "MULTIPOLYGON (((180 40, 
180 50, 170 50, 170 40, 180 40)), ((-170 40, -170 50, -180 50, -180 40, -170 
40)))")
+    }
+    it("GeoJSON Test - read and write single line STAC item") {
+      val dfR = sparkSession.read.format("geojson").load(geojsondatalocation3)
+      val rowsR = dfR.collect()(0)
+      assert(rowsR.getAs[String]("type") == "Feature")
+      assert(rowsR.getAs[String]("stac_version") == "1.0.0")
+      assert(rowsR.getAs[Polygon]("geometry").toString == "POLYGON 
((172.91173669923782 1.3438851951615003, 172.95469614953714 1.3438851951615003, 
172.95469614953714 1.3690476620161975, 172.91173669923782 1.3690476620161975, 
172.91173669923782 1.3438851951615003))")
+
+      
dfR.write.format("geojson").mode(SaveMode.Overwrite).save(geojsonoutputlocation 
+ "/geojson_write.json")
+
+      val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation 
+ "/geojson_write.json")
+
+      val rowsW = dfW.collect()(0)
+      assert(rowsR.getAs[GenericRowWithSchema]("assets") == 
rowsW.getAs[GenericRowWithSchema]("assets"))
+      assert(rowsR.getAs[String]("stac_version") == 
rowsW.getAs[String]("stac_version"))
+      assert(rowsR.getAs[Polygon]("geometry") == 
rowsW.getAs[Polygon]("geometry"))
+    }
+  }
+}

Reply via email to