This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new e4c7d7bce [SEDONA-562] Add a native DataFrame based GeoJSON reader and
writer (#1432)
e4c7d7bce is described below
commit e4c7d7bcedca7bae1a4da81ec99336fbb75ca556
Author: Jia Yu <[email protected]>
AuthorDate: Tue May 28 00:26:13 2024 -0700
[SEDONA-562] Add a native DataFrame based GeoJSON reader and writer (#1432)
* Push the code
* Minor tweak
---
docs/api/sql/Constructor.md | 3 +
docs/api/sql/Function.md | 3 +
docs/tutorial/rdd.md | 8 +-
docs/tutorial/sql.md | 205 +++++++++++---
...org.apache.spark.sql.sources.DataSourceRegister | 1 +
.../sedona_sql/io/geojson/GeoJSONFileFormat.scala | 186 +++++++++++++
.../io/geojson/GeoJSONJacksonGenerator.scala | 301 +++++++++++++++++++++
.../io/geojson/GeoJSONOutputWriter.scala | 60 ++++
.../sql/sedona_sql/io/geojson/GeoJSONUtils.scala | 117 ++++++++
.../sedona_sql/io/geojson/SparkCompatUtil.scala | 148 ++++++++++
.../src/test/resources/geojson/core-item.json | 1 +
.../geojson/geojson_feature-collection.json | 76 ++++++
spark/common/src/test/resources/geojson/test1.json | 39 +++
.../common/src/test/resources/geojson/test12.json | 41 +++
spark/common/src/test/resources/geojson/test2.json | 21 ++
.../org/apache/sedona/sql/geojsonIOTests.scala | 224 +++++++++++++++
16 files changed, 1401 insertions(+), 33 deletions(-)
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index 6498e5eb4..efe6e62ca 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -116,6 +116,9 @@ POLYGON ((0.703125 0.87890625, 0.703125 1.0546875,
1.0546875 1.0546875, 1.054687
## ST_GeomFromGeoJSON
+!!!note
+ This method is not recommended. Please use [Sedona GeoJSON data
source](../../tutorial/sql.md#load-geojson-data) to read GeoJSON files.
+
Introduction: Construct a Geometry from GeoJson
Format: `ST_GeomFromGeoJSON (GeoJson: String)`
diff --git a/docs/api/sql/Function.md b/docs/api/sql/Function.md
index 7406fe133..814c9c376 100644
--- a/docs/api/sql/Function.md
+++ b/docs/api/sql/Function.md
@@ -344,6 +344,9 @@ POINT ZM(1 1 1 1)
## ST_AsGeoJSON
+!!!note
+ This method is not recommended. Please use [Sedona GeoJSON data
source](../../tutorial/sql.md#save-as-geojson) to write GeoJSON files.
+
Introduction: Return the [GeoJSON](https://geojson.org/) string representation
of a geometry
Format: `ST_AsGeoJSON (A: Geometry)`
diff --git a/docs/tutorial/rdd.md b/docs/tutorial/rdd.md
index 27cb25e54..65fe0c859 100644
--- a/docs/tutorial/rdd.md
+++ b/docs/tutorial/rdd.md
@@ -76,6 +76,9 @@ Use the following code to create a SpatialRDD
#### From GeoJSON
+!!!note
+ Reading GeoJSON using SpatialRDD is not recommended. Please use [Sedona
SQL and DataFrame API](sql.md#load-geojson-data) to read GeoJSON files.
+
Geometries in GeoJSON is similar to WKT/WKB. However, a GeoJSON file must be
beaked into multiple lines.
Suppose we have a `polygon.json` GeoJSON file at Path `/Download/polygon.json`
as follows:
@@ -200,7 +203,7 @@ var spatialRDD = Adapter.toSpatialRdd(spatialDf, "checkin")
"checkin" is the name of the geometry column
-For WKT/WKB/GeoJSON data, please use ==ST_GeomFromWKT / ST_GeomFromWKB /
ST_GeomFromGeoJSON== instead.
+For WKT/WKB data, please use ==ST_GeomFromWKT / ST_GeomFromWKB == instead.
## Transform the Coordinate Reference System
@@ -978,6 +981,9 @@ objectRDD.saveAsWKB("hdfs://PATH")
#### Save to distributed GeoJSON text file
+!!!note
+ Saving GeoJSON using SpatialRDD is not recommended. Please use [Sedona
SQL and DataFrame API](sql.md#save-as-geojson) to write GeoJSON files.
+
Use the following code to save an SpatialRDD as a distributed GeoJSON text
file:
```scala
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index 6d28998a2..f183a1270 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -292,48 +292,176 @@ root
!!!note
SedonaSQL provides lots of functions to create a Geometry column,
please read [SedonaSQL constructor API](../api/sql/Constructor.md).
-## Load GeoJSON using Spark JSON Data Source
+## Load GeoJSON Data
+
+Since `v1.6.1`, Sedona supports reading GeoJSON files using the `geojson` data
source. It is designed to handle JSON files that use [GeoJSON
format](https://datatracker.ietf.org/doc/html/rfc7946) for their geometries.
+
+This includes SpatioTemporal Asset Catalog (STAC) files, GeoJSON features,
GeoJSON feature collections and other variations.
+The key functionality lies in the way 'geometry' fields are processed: these
are specifically read as Sedona's `GeometryUDT` type, ensuring integration with
Sedona's suite of spatial functions.
+
+### Key features
+
+- Broad Support: The reader and writer are versatile, supporting all
GeoJSON-formatted files, including STAC files, feature collections, and more.
+- Geometry Transformation: When reading, fields named 'geometry' are
automatically converted from GeoJSON format to Sedona's `GeometryUDT` type and
vice versa when writing.
+
+### Load MultiLine GeoJSON FeatureCollection
+
+Suppose we have a GeoJSON FeatureCollection file as follows.
+This entire file is considered as a single GeoJSON FeatureCollection object.
+Multiline format is preferable for scenarios where files need to be
human-readable or manually edited.
+
+```json
+{ "type": "FeatureCollection",
+ "features": [
+ { "type": "Feature",
+ "geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
+ "properties": {"prop0": "value0"}
+ },
+ { "type": "Feature",
+ "geometry": {
+ "type": "LineString",
+ "coordinates": [
+ [102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
+ ]
+ },
+ "properties": {
+ "prop0": "value1",
+ "prop1": 0.0
+ }
+ },
+ { "type": "Feature",
+ "geometry": {
+ "type": "Polygon",
+ "coordinates": [
+ [ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
+ [100.0, 1.0], [100.0, 0.0] ]
+ ]
+ },
+ "properties": {
+ "prop0": "value2",
+ "prop1": {"this": "that"}
+ }
+ }
+ ]
+}
+```
+
+Set the `multiLine` option to `True` to read multiline GeoJSON files.
+
+=== "Python"
+
+ ```python
+ df = sedona.read.format("geojson").option("multiLine",
"true").load("PATH/TO/MYFILE.json")
+ .selectExpr("explode(features) as features") # Explode the envelope to
get one feature per row.
+ .select("features.*") # Unpack the features struct.
+ .withColumn("prop0",
f.expr("properties['prop0']")).drop("properties").drop("type")
+
+ df.show()
+ df.printSchema()
+ ```
+
+=== "Scala"
+
+ ```scala
+ val df = sedona.read.format("geojson").option("multiLine",
"true").load("PATH/TO/MYFILE.json")
+ val parsedDf = df.selectExpr("explode(features) as
features").select("features.*")
+ .withColumn("prop0",
expr("properties['prop0']")).drop("properties").drop("type")
+
+ parsedDf.show()
+ parsedDf.printSchema()
+ ```
+
+=== "Java"
+
+ ```java
+ Dataset<Row> df = sedona.read.format("geojson").option("multiLine",
"true").load("PATH/TO/MYFILE.json")
+ .selectExpr("explode(features) as features") // Explode the envelope to
get one feature per row.
+ .select("features.*") // Unpack the features struct.
+ .withColumn("prop0",
expr("properties['prop0']")).drop("properties").drop("type")
+
+ df.show();
+ df.printSchema();
+ ```
+
+The output is as follows:
+
+```
++--------------------+------+
+| geometry| prop0|
++--------------------+------+
+| POINT (102 0.5)|value0|
+|LINESTRING (102 0...|value1|
+|POLYGON ((100 0, ...|value2|
++--------------------+------+
+
+root
+ |-- geometry: geometry (nullable = false)
+ |-- prop0: string (nullable = true)
+
+```
+
+### Load Single Line GeoJSON Features
+
+Suppose we have a single-line GeoJSON Features dataset as follows. Each line
is a single GeoJSON Feature.
+This format is efficient for processing large datasets where each line is a
separate, self-contained GeoJSON object.
+
+```json
+{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
+{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
+{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}
+```
-Spark SQL's built-in JSON data source supports reading GeoJSON data.
-To ensure proper parsing of the geometry property, we can define a schema with
the geometry property set to type 'string'.
-This prevents Spark from interpreting the property and allows us to use the
ST_GeomFromGeoJSON function for accurate geometry parsing.
+By default, when `option` is not specified, Sedona reads a GeoJSON file as a
single line GeoJSON.
+
+=== "Python"
+
+ ```python
+ df = sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+ .withColumn("prop0",
f.expr("properties['prop0']")).drop("properties").drop("type")
+
+ df.show()
+ df.printSchema()
+ ```
=== "Scala"
```scala
- val schema = "type string, crs string, totalFeatures long, features
array<struct<type string, geometry string, properties map<string, string>>>"
- sedona.read.schema(schema).json(geojson_path)
- .selectExpr("explode(features) as features") // Explode the
envelope to get one feature per row.
- .select("features.*") // Unpack the features struct.
- .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)"))
// Convert the geometry string.
- .printSchema()
+ val df = sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+ .withColumn("prop0",
expr("properties['prop0']")).drop("properties").drop("type")
+
+ df.show()
+ df.printSchema()
```
=== "Java"
```java
- String schema = "type string, crs string, totalFeatures long, features
array<struct<type string, geometry string, properties map<string, string>>>";
- sedona.read.schema(schema).json(geojson_path)
- .selectExpr("explode(features) as features") // Explode the
envelope to get one feature per row.
- .select("features.*") // Unpack the features struct.
- .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)"))
// Convert the geometry string.
- .printSchema();
+ Dataset<Row> df =
sedona.read.format("geojson").load("PATH/TO/MYFILE.json")
+ .withColumn("prop0",
expr("properties['prop0']")).drop("properties").drop("type")
+
+ df.show()
+ df.printSchema()
```
-=== "Python"
+The output is as follows:
- ```python
- schema = "type string, crs string, totalFeatures long, features
array<struct<type string, geometry string, properties map<string, string>>>";
- (sedona.read.json(geojson_path, schema=schema)
- .selectExpr("explode(features) as features") # Explode the
envelope to get one feature per row.
- .select("features.*") # Unpack the features struct.
- .withColumn("geometry", f.expr("ST_GeomFromGeoJSON(geometry)"))
# Convert the geometry string.
- .printSchema())
- ```
+```
++--------------------+------+
+| geometry| prop0|
++--------------------+------+
+| POINT (102 0.5)|value0|
+|LINESTRING (102 0...|value1|
+|POLYGON ((100 0, ...|value2|
++--------------------+------+
+
+root
+ |-- geometry: geometry (nullable = false)
+ |-- prop0: string (nullable = true)
+```
-## Load Shapefile and GeoJSON using SpatialRDD
+## Load Shapefile using SpatialRDD
-Shapefile and GeoJSON can be loaded by SpatialRDD and converted to DataFrame
using Adapter. Please read [Load
SpatialRDD](rdd.md#create-a-generic-spatialrdd) and [DataFrame <->
RDD](#convert-between-dataframe-and-spatialrdd).
+Shapefile can be loaded by SpatialRDD and converted to DataFrame using
Adapter. Please read [Load SpatialRDD](rdd.md#create-a-generic-spatialrdd) and
[DataFrame <-> RDD](#convert-between-dataframe-and-spatialrdd).
## Load GeoParquet
@@ -959,8 +1087,21 @@ SELECT ST_AsText(countyshape)
FROM polygondf
```
-!!!note
- ST_AsGeoJSON is also available. We would like to invite you to
contribute more functions
+## Save as GeoJSON
+
+Since `v1.6.1`, the GeoJSON data source in Sedona can be used to save a
Spatial DataFrame to a single-line JSON file, with geometries written in
GeoJSON format.
+
+```sparksql
+df.write.format("geojson").save("YOUR/PATH.json")
+```
+
+The structure of the generated file will be like this:
+
+```json
+{"type":"Feature","geometry":{"type":"Point","coordinates":[102.0,0.5]},"properties":{"prop0":"value0"}}
+{"type":"Feature","geometry":{"type":"LineString","coordinates":[[102.0,0.0],[103.0,1.0],[104.0,0.0],[105.0,1.0]]},"properties":{"prop0":"value1"}}
+{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[[100.0,0.0],[101.0,0.0],[101.0,1.0],[100.0,1.0],[100.0,0.0]]]},"properties":{"prop0":"value2"}}
+```
## Save GeoParquet
@@ -996,9 +1137,9 @@ df.write.format("geoparquet")
The value of `geoparquet.crs` and `geoparquet.crs.<column_name>` can be one of
the following:
-* `"null"`: Explicitly setting `crs` field to `null`. This is the default
behavior.
-* `""` (empty string): Omit the `crs` field. This implies that the CRS is
[OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware
implementations.
-* `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON
object representing the Coordinate Reference System (CRS) of the geometry. You
can find the PROJJSON string of a specific CRS from here: https://epsg.io/
(click the JSON option at the bottom of the page). You can also customize your
PROJJSON string as needed.
+- `"null"`: Explicitly setting `crs` field to `null`. This is the default
behavior.
+- `""` (empty string): Omit the `crs` field. This implies that the CRS is
[OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware
implementations.
+- `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON
object representing the Coordinate Reference System (CRS) of the geometry. You
can find the PROJJSON string of a specific CRS from here: https://epsg.io/
(click the JSON option at the bottom of the page). You can also customize your
PROJJSON string as needed.
Please note that Sedona currently cannot set/get a projjson string to/from a
CRS. Its geoparquet reader will ignore the projjson metadata and you will have
to set your CRS via [`ST_SetSRID`](../api/sql/Function.md#st_setsrid) after
reading the file.
Its geoparquet writer will not leverage the SRID field of a geometry so you
will have to always set the `geoparquet.crs` option manually when writing the
file, if you want to write a meaningful CRS field.
diff --git
a/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 23e950440..a71664cc3 100644
---
a/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/spark/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,2 @@
org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
+org.apache.spark.sql.sedona_sql.io.geojson.GeoJSONFileFormat
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
new file mode 100644
index 000000000..894e68786
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.json._
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, RasterUDT}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * This is taken from
[[org.apache.spark.sql.execution.datasources.json.JsonFileFormat]] with slight
modification to
+ * support GeoJSON and read/write geometry values.
+ */
+class GeoJSONFileFormat extends TextBasedFileFormat with DataSourceRegister {
+ override val shortName: String = "geojson"
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ val parsedOptions = new JSONOptionsInRead(
+ options,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ val jsonDataSource = JsonDataSource(parsedOptions)
+ jsonDataSource.isSplitable && super.isSplitable(sparkSession, options,
path)
+ }
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+
+ val parsedOptions = new JSONOptionsInRead(
+ options,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+ // Use existing logic to infer the full schema first
+ val fullSchemaOption = JsonDataSource(parsedOptions).inferSchema(
+ sparkSession, files, parsedOptions)
+
+ fullSchemaOption.map { fullSchema =>
+ // Replace 'geometry' field type with GeometryUDT
+ val newFields = GeoJSONUtils.updateGeometrySchema(fullSchema,
GeometryUDT)
+ StructType(newFields)
+ }
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ val conf = job.getConfiguration
+ val parsedOptions = new JSONOptions(
+ options,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ parsedOptions.compressionCodec.foreach { codec =>
+ CompressionCodecs.setCodecConfiguration(conf, codec)
+ }
+ val geometryColumnName = options.getOrElse("geometry.column", "geometry")
+ SparkCompatUtil.findNestedField(dataSchema, geometryColumnName.split('.'),
resolver = SQLConf.get.resolver) match {
+ case Some(StructField(_, dataType, _, _)) =>
+ if (!dataType.acceptsType(GeometryUDT)) {
+ throw new IllegalArgumentException(s"$geometryColumnName is not a
geometry column")
+ }
+ case None => throw new IllegalArgumentException(s"Column
$geometryColumnName not found in the schema")
+ }
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new GeoJSONOutputWriter(path, parsedOptions, dataSchema,
geometryColumnName, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".json" + CodecStreams.getCompressionExtension(context)
+ }
+ }
+ }
+
+ override def buildReader(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ val parsedOptions = new JSONOptionsInRead(
+ options,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+ val actualSchema =
+ StructType(requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
+ ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema,
parsedOptions.columnNameOfCorruptRecord)
+
+ val alteredSchema = GeoJSONUtils.updateGeometrySchema(actualSchema,
StringType)
+
+ if (requiredSchema.length == 1 &&
+ requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+ throw new IllegalArgumentException(
+ "referenced columns only include the internal corrupt record column,
this is not allowed")
+ }
+
+ (file: PartitionedFile) => {
+ val parser = SparkCompatUtil.constructJacksonParser(
+ alteredSchema,
+ parsedOptions,
+ allowArrayAsStructs = true)
+ val dataSource = JsonDataSource(parsedOptions)
+
+ dataSource.readFile(
+ broadcastedHadoopConf.value.value,
+ file,
+ parser,
+ actualSchema).map(row => {
+ val newRow = GeoJSONUtils.convertGeoJsonToGeometry(row, alteredSchema)
+ newRow
+ })
+ }
+ }
+
+
+ override def toString: String = "GEOJSON"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[GeoJSONFileFormat]
+
+ override def supportDataType(dataType: DataType): Boolean = dataType match {
+
+ case _: AtomicType => true
+
+ case st: StructType => st.forall { f => supportDataType(f.dataType) }
+
+ case ArrayType(elementType, _) => supportDataType(elementType)
+
+ case MapType(keyType, valueType, _) =>
+ supportDataType(keyType) && supportDataType(valueType)
+
+ case GeometryUDT => true
+ case RasterUDT => false
+ case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
+
+ case _: NullType => true
+
+ case _ => false
+ }
+}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
new file mode 100644
index 000000000..1c8a84926
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONJacksonGenerator.scala
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import com.fasterxml.jackson.core._
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import
org.apache.spark.sql.sedona_sql.io.geojson.GeoJSONUtils.geometryToGeoJson
+import org.apache.spark.sql.types._
+import org.locationtech.jts.io.WKTWriter
+
+import java.io.Writer
+
+/**
+ * `GeoJSONJacksonGenerator` is taken from [[JacksonGenerator]] with slight
modifications to write rows as
+ * GeoJSON Feature objects. It handles geometry columns properly and wrap all
other columns in `properties` column.
+ */
+private[sql] class GeoJSONJacksonGenerator(
+ schema: StructType,
+ geometryColumnName: String,
+ writer: Writer,
+ options: JSONOptions) {
+ // A `ValueWriter` is responsible for writing a field of an `InternalRow` to
appropriate
+ // JSON data. Here we are using `SpecializedGetters` rather than
`InternalRow` so that
+ // we can directly access data in `ArrayData` without the help of
`SpecificMutableRow`.
+ private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+ // `ValueWriter`s for all fields of the schema
+ private lazy val rootFieldWriters: Array[ValueWriter] =
+ schema.map(field => makeWriter(field.dataType, field.name)).toArray
+
+ // If there is a field named `properties` and it is a struct or map, we will
write all fields to top-level
+ // of the GeoJSON, otherwise we'll wrap all fields in `properties` field.
+ private val hasPropertiesField = schema.fields.exists { field =>
+ field.name == "properties" && (field.dataType.isInstanceOf[StructType] ||
field.dataType.isInstanceOf[MapType])
+ }
+
+ private val gen = {
+ val generator = new
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+ if (options.pretty) {
+ generator.setPrettyPrinter(new DefaultPrettyPrinter(""))
+ }
+ generator
+ }
+
+ private val lineSeparator: String = options.lineSeparatorInWrite
+
+ private val timestampFormatter = SparkCompatUtil.constructTimestampFormatter(
+ options,
+ options.zoneId,
+ options.locale,
+ legacyFormat = FAST_DATE_FORMAT,
+ isParsing = false)
+ private val dateFormatter = SparkCompatUtil.constructDateFormatter(
+ options,
+ options.zoneId,
+ options.locale,
+ legacyFormat = FAST_DATE_FORMAT,
+ isParsing = false)
+
+ private val wktWriter = new WKTWriter()
+ private var currentGeoJsonString: Option[String] = None
+
+ private def makeWriter(dataType: DataType, name: String): ValueWriter =
dataType match {
+ case NullType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNull()
+
+ case BooleanType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBoolean(row.getBoolean(ordinal))
+
+ case ByteType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getByte(ordinal))
+
+ case ShortType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getShort(ordinal))
+
+ case IntegerType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getInt(ordinal))
+
+ case LongType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getLong(ordinal))
+
+ case FloatType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getFloat(ordinal))
+
+ case DoubleType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDouble(ordinal))
+
+ case StringType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(row.getUTF8String(ordinal).toString)
+
+ case TimestampType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val timestampString = timestampFormatter.format(row.getLong(ordinal))
+ gen.writeString(timestampString)
+
+ case DateType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val dateString = dateFormatter.format(row.getInt(ordinal))
+ gen.writeString(dateString)
+
+ case CalendarIntervalType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeString(row.getInterval(ordinal).toString)
+
+ case BinaryType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeBinary(row.getBinary(ordinal))
+
+ case dt: DecimalType =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ gen.writeNumber(row.getDecimal(ordinal, dt.precision,
dt.scale).toJavaBigDecimal)
+
+ case st: StructType =>
+ val fieldWriters = st.map(field => makeWriter(field.dataType, name + "."
+ field.name))
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeFields(name, row.getStruct(ordinal, st.length), st,
fieldWriters))
+
+ case at: ArrayType =>
+ val elementWriter = makeWriter(at.elementType, name)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+ case mt: MapType =>
+ val valueWriter = makeWriter(mt.valueType, name)
+ (row: SpecializedGetters, ordinal: Int) =>
+ writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+ case GeometryUDT =>
+ // We'll only write non-primary geometry columns here, we'll write it as
WKT to properties object.
+ (row: SpecializedGetters, ordinal: Int) => {
+ val geom = GeometryUDT.deserialize(row.getBinary(ordinal))
+ val wkt = wktWriter.write(geom)
+ gen.writeString(wkt)
+ }
+
+ // For UDT values, they should be in the SQL type's corresponding value
type.
+ // We should not see values in the user-defined class at here.
+ // For example, VectorUDT's SQL type is an array of double. So, we should
expect that v is
+ // an ArrayData at here, instead of a Vector.
+ case t: UserDefinedType[_] =>
+ makeWriter(t.sqlType, name)
+
+ case _ =>
+ (row: SpecializedGetters, ordinal: Int) =>
+ val v = row.get(ordinal, dataType)
+ throw new IllegalArgumentException(s"Unsupported dataType:
${dataType.catalogString} " +
+ s"with value $v")
+ }
+
+ private def writeObject(f: => Unit): Unit = {
+ gen.writeStartObject()
+ f
+ gen.writeEndObject()
+ }
+
+ private def writeFields(name: String, row: InternalRow, schema: StructType,
fieldWriters: Seq[ValueWriter]): Unit = {
+ var i = 0
+ while (i < row.numFields) {
+ val field = schema(i)
+ val fullFieldName = if (name.isEmpty) field.name else
s"$name.${field.name}"
+ if (fullFieldName == geometryColumnName) {
+ // Primary geometry field, don't need to write out the key. We save
the geometry and will write it out as
+ // geometry field after we've done with writing the properties field.
+ if (!row.isNullAt(i)) {
+ val geoJson = geometryToGeoJson(row.getBinary(i))
+ currentGeoJsonString = Some(geoJson)
+ } else {
+ currentGeoJsonString = None
+ }
+ } else {
+ // properties field, simply write it out
+ if (hasPropertiesField && (fullFieldName == "type" || fullFieldName ==
"geometry")) {
+ // Ignore top-level type and geometry fields
+ } else {
+ if (!row.isNullAt(i)) {
+ gen.writeFieldName(field.name)
+ fieldWriters(i).apply(row, i)
+ } else if (!options.ignoreNullFields) {
+ gen.writeFieldName(field.name)
+ gen.writeNull()
+ }
+ }
+ }
+ i += 1
+ }
+ }
+
+ private def writeArray(f: => Unit): Unit = {
+ gen.writeStartArray()
+ f
+ gen.writeEndArray()
+ }
+
+ private def writeArrayData(
+ array: ArrayData, fieldWriter: ValueWriter): Unit = {
+ var i = 0
+ while (i < array.numElements()) {
+ if (!array.isNullAt(i)) {
+ fieldWriter.apply(array, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ private def writeMapData(
+ map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+ val keyArray = map.keyArray()
+ val valueArray = map.valueArray()
+ var i = 0
+ while (i < map.numElements()) {
+ gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+ if (!valueArray.isNullAt(i)) {
+ fieldWriter.apply(valueArray, i)
+ } else {
+ gen.writeNull()
+ }
+ i += 1
+ }
+ }
+
+ def close(): Unit = gen.close()
+
+ /**
+ * Transforms a single `InternalRow` to JSON object using Jackson.
+ * This api calling will be validated through accessing `rootFieldWriters`.
+ *
+ * @param row The row to convert
+ */
+ def write(row: InternalRow): Unit = {
+ currentGeoJsonString = None
+
+ gen.writeStartObject()
+ gen.writeStringField("type", "Feature")
+
+ if (hasPropertiesField) {
+ // Write all fields as top-level fields
+ writeFields(
+ name = "",
+ fieldWriters = rootFieldWriters,
+ row = row,
+ schema = schema)
+ } else {
+ // Wrap all fields in `properties` field
+ gen.writeFieldName("properties")
+ gen.writeStartObject()
+ writeFields(
+ name = "",
+ fieldWriters = rootFieldWriters,
+ row = row,
+ schema = schema)
+ gen.writeEndObject()
+ }
+
+ // write geometry
+ gen.writeFieldName("geometry")
+ currentGeoJsonString match {
+ case Some(geoJson) => gen.writeRawValue(geoJson)
+ case None => gen.writeNull()
+ }
+
+ // end of feature
+ gen.writeEndObject()
+ }
+
+ def writeLineEnding(): Unit = {
+ // Note that JSON uses writer with UTF-8 charset. This string will be
written out as UTF-8.
+ gen.writeRaw(lineSeparator)
+ }
+}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
new file mode 100644
index 000000000..58239b12e
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONOutputWriter.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+import java.nio.charset.{Charset, StandardCharsets}
+
+/**
+ * This is taken from
[[org.apache.spark.sql.execution.datasources.json.JsonOutputWriter]] with slight
+ * modifications to write GeoJSON output.
+ */
+class GeoJSONOutputWriter(
+ val path: String,
+ options: JSONOptions,
+ dataSchema: StructType,
+ geometryColumnName: String,
+ context: TaskAttemptContext)
+ extends OutputWriter with Logging {
+
+ private val encoding = options.encoding match {
+ case Some(charsetName) => Charset.forName(charsetName)
+ case None => StandardCharsets.UTF_8
+ }
+
+ private val writer = CodecStreams.createOutputStreamWriter(context, new
Path(path), encoding)
+
+ // create the Generator without separator inserted between 2 records
+ private[this] val gen = new GeoJSONJacksonGenerator(dataSchema,
geometryColumnName, writer, options)
+
+ override def write(row: InternalRow): Unit = {
+ gen.write(row)
+ gen.writeLineEnding()
+ }
+
+ override def close(): Unit = {
+ gen.close()
+ writer.close()
+ }
+}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
new file mode 100644
index 000000000..6ab7ba7c8
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONUtils.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.sedona.common.Constructors.geomFromText
+import org.apache.sedona.common.Functions.asGeoJson
+import org.apache.sedona.common.enums.FileDataSplitter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+object GeoJSONUtils {
+
+ def updateGeometrySchema(schema: StructType, datatype: DataType): StructType
= {
+ StructType(schema.fields.map {
+ case StructField("geometry", _, nullable, metadata) =>
+ StructField("geometry", datatype, nullable, metadata)
+ case StructField(name, dataType: StructType, nullable, metadata) =>
+ StructField(name, updateGeometrySchema(dataType, datatype), nullable,
metadata)
+ case StructField(name, ArrayType(elementType: StructType, containsNull),
nullable, metadata) =>
+ val updatedElementType = updateGeometrySchema(elementType, datatype)
+ StructField(name, ArrayType(updatedElementType, containsNull),
nullable, metadata)
+ case other => other
+ })
+ }
+
+ def geoJsonToGeometry(geoJson: String): Array[Byte] = {
+ val geometry = geomFromText(geoJson, FileDataSplitter.GEOJSON)
+ GeometryUDT.serialize(geometry)
+ }
+
+ def geometryToGeoJson(geometryBinary: Array[Byte]): String = {
+ val geometry = GeometryUDT.deserialize(geometryBinary)
+ asGeoJson(geometry)
+ }
+
+ def handleArray(row: InternalRow, index: Int, elementType: DataType,
toGeometry: Boolean): ArrayData = {
+ val arrayData = row.getArray(index)
+ if (arrayData == null || arrayData.numElements() == 0) return new
GenericArrayData(Seq.empty[Any])
+
+ elementType match {
+ case structType: StructType =>
+ val convertedArray = (0 until arrayData.numElements()).map { i =>
+ if (!arrayData.isNullAt(i)) {
+ val innerRow = arrayData.getStruct(i, structType.fields.length)
+ if (toGeometry) {
+ convertGeoJsonToGeometry(innerRow, structType)
+ } else {
+ convertGeometryToGeoJson(innerRow, structType)
+ }
+ } else {
+ null
+ }
+ }
+ new GenericArrayData(convertedArray)
+ case _ => arrayData
+ }
+ }
+
+ def convertGeometryToGeoJson(row: InternalRow, schema: StructType):
InternalRow = {
+ val newValues = new Array[Any](schema.fields.length)
+
+ schema.fields.zipWithIndex.foreach {
+ case (StructField("geometry", _: GeometryUDT, _, _), index) =>
+ val geometryBinary = row.getBinary(index)
+ newValues(index) =
UTF8String.fromString(geometryToGeoJson(geometryBinary))
+ case (StructField(_, structType: StructType, _, _), index) =>
+ val nestedRow = row.getStruct(index, structType.fields.length)
+ newValues(index) = convertGeometryToGeoJson(nestedRow, structType)
+ case (StructField(_, arrayType: ArrayType, _, _), index) =>
+ newValues(index) = handleArray(row, index, arrayType.elementType,
false)
+ case (_, index) =>
+ newValues(index) = row.get(index, schema.fields(index).dataType)
+ }
+
+ InternalRow.fromSeq(newValues)
+ }
+
+
+ def convertGeoJsonToGeometry(row: InternalRow, schema: StructType):
InternalRow = {
+ val newValues = new Array[Any](schema.fields.length)
+
+ schema.fields.zipWithIndex.foreach {
+ case (StructField("geometry", StringType, _, _), index) =>
+ val geometryGeoJson = row.getString(index)
+ newValues(index) = geoJsonToGeometry(geometryGeoJson)
+ case (StructField(_, structType: StructType, _, _), index) =>
+ val nestedRow = row.getStruct(index, structType.fields.length)
+ newValues(index) = convertGeoJsonToGeometry(nestedRow, structType)
+ case (StructField(_, arrayType: ArrayType, _, _), index) =>
+ newValues(index) = handleArray(row, index, arrayType.elementType, true)
+ case (_, index) =>
+ newValues(index) = row.get(index, schema.fields(index).dataType)
+ }
+
+ InternalRow.fromSeq(newValues)
+ }
+
+}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
new file mode 100644
index 000000000..3ce1df7d7
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.io.geojson
+
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LegacyDateFormat
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+
+import scala.reflect.runtime.{universe => ru}
+import java.time.ZoneId
+import java.util.Locale
+
+/**
+ * Making GeoJSONFileFormat and GeoJSONOutputWriter classes compatible with
Spark 3.0.x.
+ */
+object SparkCompatUtil {
+ // Here we are defining our own findNestedField method instead of using
StructType.findNestedField method for
+ // compatibility with Spark 3.0.x
+ def findNestedField(schema: StructType, path: Array[String], resolver:
Resolver): Option[StructField] = {
+ path match {
+ case Array(part) =>
+ schema.find(f => resolver(f.name, part))
+ case Array(head, tail @ _*) =>
+ schema.find(f => resolver(f.name, head)).flatMap { f =>
+ f.dataType match {
+ case st: StructType => findNestedField(st, tail.toArray, resolver)
+ case _ => None
+ }
+ }
+ case _ => None
+ }
+ }
+
+ def constructTimestampFormatter(
+ options: JSONOptions,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat,
+ isParsing: Boolean
+ ): TimestampFormatter = {
+ val mirror = ru.runtimeMirror(getClass.getClassLoader)
+ val timestampFormatTerm =
ru.typeOf[JSONOptions].decl(ru.TermName("timestampFormat"))
+ val timestampFormatInWriteTerm =
ru.typeOf[JSONOptions].decl(ru.TermName("timestampFormatInWrite"))
+ val instanceMirror = mirror.reflect(options)
+ val format = if (timestampFormatTerm.isMethod) {
+
instanceMirror.reflectMethod(timestampFormatTerm.asMethod).apply().asInstanceOf[String]
+ } else if (timestampFormatInWriteTerm.isMethod) {
+
instanceMirror.reflectMethod(timestampFormatInWriteTerm.asMethod).apply().asInstanceOf[String]
+ } else {
+ throw new Exception("Neither timestampFormat nor timestampFormatInWrite
found in JSONOptions")
+ }
+ TimestampFormatter(
+ format,
+ zoneId,
+ locale,
+ legacyFormat,
+ isParsing)
+ }
+
+ def constructDateFormatter(
+ options: JSONOptions,
+ zoneId: ZoneId,
+ locale: Locale,
+ legacyFormat: LegacyDateFormat,
+ isParsing: Boolean): DateFormatter = {
+ val mirror = ru.runtimeMirror(getClass.getClassLoader)
+
+ val dateFormatTerm = ru.typeOf[JSONOptions].decl(ru.TermName("dateFormat"))
+ val dateFormatInWriteTerm =
ru.typeOf[JSONOptions].decl(ru.TermName("dateFormatInWrite"))
+ val instanceMirror = mirror.reflect(options)
+ val format = if (dateFormatTerm.isMethod) {
+
instanceMirror.reflectMethod(dateFormatTerm.asMethod).apply().asInstanceOf[String]
+ } else if (dateFormatInWriteTerm.isMethod) {
+
instanceMirror.reflectMethod(dateFormatInWriteTerm.asMethod).apply().asInstanceOf[String]
+ } else {
+ throw new Exception("Neither dateFormat nor dateFormatInWrite found in
JSONOptions")
+ }
+
+ val dateFormatterClass =
mirror.staticClass("org.apache.spark.sql.catalyst.util.DateFormatter$")
+ val dateFormatterModule = mirror.staticModule(dateFormatterClass.fullName)
+ val dateFormatterInstance = mirror.reflectModule(dateFormatterModule)
+ val applyMethods =
dateFormatterClass.toType.members.filter(_.name.decodedName.toString == "apply")
+ applyMethods.find(_.typeSignature.paramLists.flatten.size == 5) match {
+ case Some(applyMethod) =>
+
mirror.reflect(dateFormatterInstance.instance).reflectMethod(applyMethod.asMethod)(
+ format, zoneId, locale, legacyFormat, isParsing
+ ).asInstanceOf[DateFormatter]
+ case None =>
+ applyMethods.find { method =>
+ val params = method.typeSignature.paramLists.flatten
+ params.size == 4 &&
+ // get rid of the variant taking Option[String] as first parameter
+ params.head.typeSignature <:< ru.typeOf[String]
+ } match {
+ case Some(applyMethod) =>
+
mirror.reflect(dateFormatterInstance.instance).reflectMethod(applyMethod.asMethod)(
+ format, locale, legacyFormat, isParsing
+ ).asInstanceOf[DateFormatter]
+ case None =>
+ throw new Exception("No suitable apply method found in
DateFormatter")
+ }
+ }
+ }
+
+ def constructJacksonParser(
+ schema: DataType,
+ options: JSONOptions,
+ allowArrayAsStructs: Boolean
+ ): JacksonParser = {
+ val mirror = ru.runtimeMirror(getClass.getClassLoader)
+ val jacksonParserClass =
mirror.staticClass("org.apache.spark.sql.catalyst.json.JacksonParser")
+
+ val constructorMethods =
jacksonParserClass.toType.members.filter(_.isConstructor)
+
+ constructorMethods.find(_.typeSignature.paramLists.flatten.size == 3)
match {
+ case Some(constructorMethod) =>
+
mirror.reflectClass(jacksonParserClass).reflectConstructor(constructorMethod.asMethod)(
+ schema, options, allowArrayAsStructs
+ ).asInstanceOf[JacksonParser]
+ case None =>
+ constructorMethods.find(_.typeSignature.paramLists.flatten.size == 4)
match {
+ case Some(constructorMethod) =>
+
mirror.reflectClass(jacksonParserClass).reflectConstructor(constructorMethod.asMethod)(
+ schema, options, allowArrayAsStructs, Seq.empty
+ ).asInstanceOf[JacksonParser]
+ case None =>
+ throw new Exception("No suitable constructor found in
JacksonParser")
+ }
+ }
+ }
+}
diff --git a/spark/common/src/test/resources/geojson/core-item.json
b/spark/common/src/test/resources/geojson/core-item.json
new file mode 100644
index 000000000..b9bdbb014
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/core-item.json
@@ -0,0 +1 @@
+{"stac_version": "1.0.0","stac_extensions": [],"type": "Feature","id":
"20201211_223832_CS2","bbox":
[172.91173669923782,1.3438851951615003,172.95469614953714,1.3690476620161975],"geometry":
{"type": "Polygon","coordinates":
[[[172.91173669923782,1.3438851951615003],[172.95469614953714,1.3438851951615003],[172.95469614953714,1.3690476620161975],[172.91173669923782,1.3690476620161975],[172.91173669923782,1.3438851951615003]]]},"properties":
{"title": "Core Item","description": "A sample S [...]
diff --git
a/spark/common/src/test/resources/geojson/geojson_feature-collection.json
b/spark/common/src/test/resources/geojson/geojson_feature-collection.json
new file mode 100644
index 000000000..08803b13c
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/geojson_feature-collection.json
@@ -0,0 +1,76 @@
+{
+ "type": "FeatureCollection",
+ "features": [
+ { "type": "Feature",
+ "geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
+ "properties": {"prop0": "value0"}
+ },
+ { "type": "Feature",
+ "geometry": {
+ "type": "LineString",
+ "coordinates": [
+ [102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
+ ]
+ },
+ "properties": {
+ "prop0": "value1",
+ "prop1": 0.0
+ }
+ },
+ { "type": "Feature",
+ "geometry": {
+ "type": "Polygon",
+ "coordinates": [
+ [ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
+ [100.0, 1.0], [100.0, 0.0] ]
+ ]
+ },
+ "properties": {
+ "prop0": "value2",
+ "prop1": {"this": "that"}
+ }
+ },
+ {
+ "type": "Feature",
+ "geometry": {
+ "type": "MultiLineString",
+ "coordinates": [
+ [[170.0, 45.0], [180.0, 45.0]],
+ [[-180.0, 45.0], [-170.0, 45.0]]
+ ]
+ },
+ "properties": {
+ "prop0": "value3",
+ "prop1": {"this": "that"}
+ }
+ },
+ {
+ "type": "Feature",
+ "geometry": {
+ "type": "MultiPolygon",
+ "coordinates": [
+ [[[180.0, 40.0], [180.0, 50.0], [170.0, 50.0], [170.0, 40.0],
[180.0, 40.0]]],
+ [[[-170.0, 40.0], [-170.0, 50.0], [-180.0, 50.0], [-180.0, 40.0],
[-170.0, 40.0]]]
+ ]
+ },
+ "properties": {
+ "prop0": "value4",
+ "prop1": {"this": "that"}
+ }
+ },
+ {
+ "type": "Feature",
+ "geometry": {
+ "type": "MultiPoint",
+ "coordinates": [
+ [100.0, 0.0],
+ [101.0, 1.0]
+ ]
+ },
+ "properties": {
+ "prop0": "value5",
+ "prop1": {"this": "that"}
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/spark/common/src/test/resources/geojson/test1.json
b/spark/common/src/test/resources/geojson/test1.json
new file mode 100644
index 000000000..ed8a71dc5
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test1.json
@@ -0,0 +1,39 @@
+{
+ "type": "Feature",
+ "geometry": {
+ "type": "Polygon",
+ "coordinates": [
+ [
+ [
+ 172.91173669923782,
+ 1.3438851951615003
+ ],
+ [
+ 172.95469614953714,
+ 1.3438851951615003
+ ],
+ [
+ 172.95469614953714,
+ 1.3690476620161975
+ ],
+ [
+ 172.91173669923782,
+ 1.3690476620161975
+ ],
+ [
+ 172.91173669923782,
+ 1.3438851951615003
+ ]
+ ]
+ ]
+ },
+ "properties": {
+ "title": "Core Item",
+ "description": "A sample STAC Item that includes examples of all common
metadata",
+ "start_datetime": "2020-12-11T22:38:32.125Z",
+ "end_datetime": "2020-12-11T22:38:32.327Z",
+ "created": "2020-12-12T01:48:13.725Z",
+ "gsd": 0.512
+ },
+ "assets": []
+}
diff --git a/spark/common/src/test/resources/geojson/test12.json
b/spark/common/src/test/resources/geojson/test12.json
new file mode 100644
index 000000000..6b75c89a2
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test12.json
@@ -0,0 +1,41 @@
+{
+ "type": "Feature",
+ "geometry": {
+ "type": "Polygon",
+ "coordinates": [
+ [
+ [
+ 172.91173669923782,
+ 1.3438851951615003
+ ],
+ [
+ 172.95469614953714,
+ 1.3438851951615003
+ ],
+ [
+ 172.95469614953714,
+ 1.3690476620161975
+ ],
+ [
+ 172.91173669923782,
+ 1.3690476620161975
+ ],
+ [
+ 172.91173669923782,
+ 1.3438851951615003
+ ]
+ ]
+ ]
+ },
+ "properties": {
+ "title": "Core Item",
+ "description": "A sample STAC Item that includes examples of all common
metadata",
+ "start_datetime": "2020-12-11T22:38:32.125Z",
+ "end_datetime": "2020-12-11T22:38:32.327Z",
+ "created": "2020-12-12T01:48:13.725Z",
+ "gsd": 0.512
+ },
+ "assets": {
+ "proj": [[]]
+ }
+}
diff --git a/spark/common/src/test/resources/geojson/test2.json
b/spark/common/src/test/resources/geojson/test2.json
new file mode 100644
index 000000000..f2a2dec4b
--- /dev/null
+++ b/spark/common/src/test/resources/geojson/test2.json
@@ -0,0 +1,21 @@
+{
+ "type": "Feature",
+ "geometry": {
+ "type": "MultiLineString",
+ "coordinates": [
+ [
+ [170.0, 45.0], [180.0, 45.0]
+ ], [
+ [-180.0, 45.0], [-170.0, 45.0]
+ ]
+ ]
+ },
+ "properties": {
+ "title": "Core Item",
+ "description": "A sample STAC Item that includes examples of all common
metadata",
+ "start_datetime": "2020-12-11T22:38:32.125Z",
+ "end_datetime": "2020-12-11T22:38:32.327Z",
+ "created": "2020-12-12T01:48:13.725Z",
+ "gsd": 0.512
+ }
+}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
new file mode 100644
index 000000000..a1e982fcd
--- /dev/null
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geojsonIOTests.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.functions.{col, explode, expr}
+import org.locationtech.jts.geom.{Geometry, MultiLineString, Point, Polygon}
+import org.scalatest.BeforeAndAfterAll
+
+import java.io.File
+
+class geojsonIOTests extends TestBaseScala with BeforeAndAfterAll {
+ val geojsondatalocation1: String = resourceFolder + "geojson/test1*"
+ val geojsondatalocation2: String = resourceFolder +
"geojson/geojson_feature-collection.json"
+ val geojsondatalocation3: String = resourceFolder + "geojson/core-item.json"
+ val geojsondatalocation4: String = resourceFolder + "geojson/test2.json"
+ val geojsonoutputlocation: String = resourceFolder +
"geojson/geojson_output/"
+
+ override def afterAll(): Unit = FileUtils.deleteDirectory(new
File(geojsonoutputlocation))
+
+ describe("GeoJSON IO tests") {
+ it("GeoJSON Test - Simple DataFrame writing and reading") {
+ val df = sparkSession.range(0, 10).toDF("id")
+ .withColumn("geometry", expr("ST_Point(id, id)"))
+ .withColumn("text", expr("concat('test', id)"))
+
df.write.format("geojson").mode(SaveMode.Overwrite).save(geojsonoutputlocation
+ "/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema = "type string, geometry string, properties struct<id:int,
text:string>"
+ val dfJson =
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation +
"/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+
assert(row.getAs[GenericRowWithSchema]("properties").getAs("text").toString.startsWith("test"))
+ }
+
+ // Read the GeoJSON back using the GeoJSON reader
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ dfW.collect().foreach { row =>
+ val geom = row.getAs[Geometry]("geometry")
+ assert(geom.isInstanceOf[Point])
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ properties.getAs("text").toString.startsWith("test")
+ }
+ }
+
+ it("GeoJSON Test - Specifying geometry column other than geometry") {
+ val df = sparkSession.range(0, 10).toDF("id")
+ .withColumn("point", expr("ST_Point(id, id)"))
+ .withColumn("geom", expr("ST_MakeLine(ST_Point(id, id), ST_Point(id,
id + 1))"))
+ .withColumn("text", expr("concat('test', id)"))
+ df.write.format("geojson").option("geometry.column",
"point").mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema = "type string, geometry string, properties struct<id:int,
text:string, geom:string>"
+ val dfJson =
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation +
"/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ assert(properties.getAs("geom").toString.startsWith("LINESTRING"))
+ }
+
+ // Read the GeoJSON back using the GeoJSON reader
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ dfW.collect().foreach { row =>
+ val geom = row.getAs[Geometry]("geometry")
+ assert(geom.isInstanceOf[Point])
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ properties.getAs("text").toString.startsWith("test")
+ properties.getAs("geom").toString.startsWith("LINESTRING")
+ }
+ }
+
+ it("GeoJSON Test - Specifying geometry column in a nested struct column") {
+ val df = sparkSession.range(0, 10).toDF("id")
+ .withColumn("text_outer", expr("concat('test_outer', id)"))
+ .withColumn("nested", expr("struct(id, concat('test_inner', id) AS
text_inner, ST_Point(id, id) AS geom)"))
+ df.write.format("geojson").option("geometry.column",
"nested.geom").mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema = "type string, geometry string, properties
struct<text_outer:string, nested:struct<id:int, text_inner:string>>"
+ val dfJson =
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation +
"/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+
assert(properties.getAs("text_outer").toString.startsWith("test_outer"))
+ val nested = properties.getAs[GenericRowWithSchema]("nested")
+ assert(nested.getAs("text_inner").toString.startsWith("test_inner"))
+ }
+
+ // Read the GeoJSON back using the GeoJSON reader
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ dfW.collect().foreach { row =>
+ val geom = row.getAs[Geometry]("geometry")
+ assert(geom.isInstanceOf[Point])
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ properties.getAs("text_outer").toString.startsWith("test_outer")
+ val nested = properties.getAs[GenericRowWithSchema]("nested")
+ assert(nested.getAs("text_inner").toString.startsWith("test_inner"))
+ }
+ }
+
+ it("GeoJSON Test - DataFrame containing properties column") {
+ val df = sparkSession.range(0, 10).toDF("id")
+ .withColumn("point", expr("ST_Point(id, id)"))
+ .withColumn("test_outer", expr("concat('test_outer', id)"))
+ .withColumn("properties", expr("struct(id, concat('test', id) AS
text)"))
+ df.write.format("geojson").option("geometry.column",
"point").mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ // Read the GeoJSON back using JSON reader
+ val schema = "type string, geometry string, test_outer string,
properties struct<id:int, text:string>"
+ val dfJson =
sparkSession.read.schema(schema).format("json").load(geojsonoutputlocation +
"/geojson_write.json")
+ dfJson.collect().foreach { row =>
+
assert(row.getAs("geometry").toString.startsWith("{\"type\":\"Point\""))
+ assert(row.getAs[String]("test_outer").startsWith("test_outer"))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ }
+
+ // Read the GeoJSON back using the GeoJSON reader
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ dfW.collect().foreach { row =>
+ val geom = row.getAs[Geometry]("geometry")
+ assert(geom.isInstanceOf[Point])
+ assert(row.getAs[String]("test_outer").startsWith("test_outer"))
+ val properties = row.getAs[GenericRowWithSchema]("properties")
+ assert(properties.getAs("text").toString.startsWith("test"))
+ }
+ }
+
+ it("GeoJSON Test - Read and Write multiline GeoJSON file") {
+ val dfR = sparkSession.read.format("geojson").option("multiLine",
true).load(geojsondatalocation1)
+ val rowsR = dfR.collect()(0)
+
+ assert((rowsR.getAs[GenericRowWithSchema]("assets") != null) == true)
+ assert(rowsR.getAs[String]("type") == "Feature")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(0) ==
"2020-12-12T01:48:13.725Z")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(1) ==
"A sample STAC Item that includes examples of all common metadata")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(5) ==
"Core Item")
+ assert(rowsR.getAs[Polygon]("geometry").toString == "POLYGON
((172.91173669923782 1.3438851951615003, 172.95469614953714 1.3438851951615003,
172.95469614953714 1.3690476620161975, 172.91173669923782 1.3690476620161975,
172.91173669923782 1.3438851951615003))")
+
+ dfR.write.format("geojson").option("multiLine",
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ val rowsW = dfW.collect()(0)
+ assert(rowsR.getAs[GenericRowWithSchema]("properties") ==
rowsW.getAs[GenericRowWithSchema]("properties"))
+ assert(rowsR.getAs[Polygon]("geometry") ==
rowsW.getAs[Polygon]("geometry"))
+ assert(rowsR.getAs[String]("type") == rowsW.getAs[String]("type"))
+ }
+ it("GeoJSON Test - Read and Write MultilineString geometry") {
+ val dfR = sparkSession.read.format("geojson").option("multiLine",
true).load(geojsondatalocation4)
+ val rowsR = dfR.collect()(0)
+ assert(rowsR.getAs[String]("type") == "Feature")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(0) ==
"2020-12-12T01:48:13.725Z")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(1) ==
"A sample STAC Item that includes examples of all common metadata")
+ assert(rowsR.getAs[GenericRowWithSchema]("properties").getString(5) ==
"Core Item")
+ assert(rowsR.getAs[MultiLineString]("geometry").toString ==
"MULTILINESTRING ((170 45, 180 45), (-180 45, -170 45))")
+
+ dfR.write.format("geojson").option("multiLine",
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ val rowsW = dfW.collect()(0)
+ assert(rowsR.getAs[GenericRowWithSchema]("properties") ==
rowsW.getAs[GenericRowWithSchema]("properties"))
+ assert(rowsR.getAs[MultiLineString]("geometry") ==
rowsW.getAs[MultiLineString]("geometry"))
+ assert(rowsR.getAs[String]("type") == rowsW.getAs[String]("type"))
+ }
+ it("GeoJSON Test - feature collection test") {
+ val dfR = sparkSession.read.format("geojson").option("multiLine",
true).load(geojsondatalocation2)
+ val rowsR = dfR.collect()(0)
+
+ assert(rowsR.getAs[Seq[Row]]("features")(0).get(0).toString == "POINT
(102 0.5)")
+ assert(rowsR.getAs[Seq[Row]]("features")(1).get(0).toString ==
"LINESTRING (102 0, 103 1, 104 0, 105 1)")
+ assert(rowsR.getAs[Seq[Row]]("features")(2).get(0).toString == "POLYGON
((100 0, 101 0, 101 1, 100 1, 100 0))")
+ assert(rowsR.getAs[Seq[Row]]("features")(3).get(0).toString ==
"MULTILINESTRING ((170 45, 180 45), (-180 45, -170 45))")
+ assert(rowsR.getAs[Seq[Row]]("features")(4).get(0).toString ==
"MULTIPOLYGON (((180 40, 180 50, 170 50, 170 40, 180 40)), ((-170 40, -170 50,
-180 50, -180 40, -170 40)))")
+
+ val df =
dfR.select(explode(col("features")).alias("feature")).selectExpr("feature.*")
+ df.write.format("geojson").option("multiLine",
true).mode(SaveMode.Overwrite).save(geojsonoutputlocation +
"/geojson_write.json")
+
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+ val rowsW = dfW.collect()
+ assert(rowsW.length == 6)
+ assert(rowsW(0).getAs("geometry").toString == "POINT (102 0.5)")
+ assert(rowsW(1).getAs("geometry").toString == "LINESTRING (102 0, 103 1,
104 0, 105 1)")
+ assert(rowsW(2).getAs("geometry").toString == "POLYGON ((100 0, 101 0,
101 1, 100 1, 100 0))")
+ assert(rowsW(3).getAs("geometry").toString == "MULTILINESTRING ((170 45,
180 45), (-180 45, -170 45))")
+ assert(rowsW(4).getAs("geometry").toString == "MULTIPOLYGON (((180 40,
180 50, 170 50, 170 40, 180 40)), ((-170 40, -170 50, -180 50, -180 40, -170
40)))")
+ }
+ it("GeoJSON Test - read and write single line STAC item") {
+ val dfR = sparkSession.read.format("geojson").load(geojsondatalocation3)
+ val rowsR = dfR.collect()(0)
+ assert(rowsR.getAs[String]("type") == "Feature")
+ assert(rowsR.getAs[String]("stac_version") == "1.0.0")
+ assert(rowsR.getAs[Polygon]("geometry").toString == "POLYGON
((172.91173669923782 1.3438851951615003, 172.95469614953714 1.3438851951615003,
172.95469614953714 1.3690476620161975, 172.91173669923782 1.3690476620161975,
172.91173669923782 1.3438851951615003))")
+
+
dfR.write.format("geojson").mode(SaveMode.Overwrite).save(geojsonoutputlocation
+ "/geojson_write.json")
+
+ val dfW = sparkSession.read.format("geojson").load(geojsonoutputlocation
+ "/geojson_write.json")
+
+ val rowsW = dfW.collect()(0)
+ assert(rowsR.getAs[GenericRowWithSchema]("assets") ==
rowsW.getAs[GenericRowWithSchema]("assets"))
+ assert(rowsR.getAs[String]("stac_version") ==
rowsW.getAs[String]("stac_version"))
+ assert(rowsR.getAs[Polygon]("geometry") ==
rowsW.getAs[Polygon]("geometry"))
+ }
+ }
+}