This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new b3ce9012 [SEDONA-243] R features: read/write geoparquet, get names
from shapefiles (#770)
b3ce9012 is described below
commit b3ce9012d6a87b89d68227d9c56e42e65e45f651
Author: gregleleu <[email protected]>
AuthorDate: Thu Feb 16 19:07:11 2023 -0500
[SEDONA-243] R features: read/write geoparquet, get names from shapefiles
(#770)
---
R/NAMESPACE | 3 +
R/R/data_interface.R | 99 +++++++++++++++++++++++++
R/R/sdf_interface.R | 14 +++-
R/tests/testthat/test-data-interface.R | 129 ++++++++++++++++++++++++++++-----
R/tests/testthat/test-sdf-interface.R | 46 +++++++++++-
5 files changed, 267 insertions(+), 24 deletions(-)
diff --git a/R/NAMESPACE b/R/NAMESPACE
index 5edffa04..f384a196 100644
--- a/R/NAMESPACE
+++ b/R/NAMESPACE
@@ -13,6 +13,7 @@ export(sedona_range_query)
export(sedona_read_dsv_to_typed_rdd)
export(sedona_read_geojson)
export(sedona_read_geojson_to_typed_rdd)
+export(sedona_read_geoparquet)
export(sedona_read_shapefile)
export(sedona_read_shapefile_to_typed_rdd)
export(sedona_read_wkb)
@@ -24,9 +25,11 @@ export(sedona_save_spatial_rdd)
export(sedona_spatial_join)
export(sedona_spatial_join_count_by_key)
export(sedona_write_geojson)
+export(sedona_write_geoparquet)
export(sedona_write_wkb)
export(sedona_write_wkt)
export(to_spatial_rdd)
import(sparklyr)
importFrom(rlang,"%||%")
importFrom(sparklyr,sdf_register)
+importFrom(sparklyr,spark_dataframe)
diff --git a/R/R/data_interface.R b/R/R/data_interface.R
index 436da7bb..0f03cdd8 100644
--- a/R/R/data_interface.R
+++ b/R/R/data_interface.R
@@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+
+# ------- Read ------------
+
#' Create a SpatialRDD from an external data source.
#' Import spatial object from an external data source into a Sedona SpatialRDD.
#'
@@ -439,6 +442,55 @@ sedona_read_shapefile <- function(sc,
new_spatial_rdd(NULL)
}
+
+#' Read a geoparquet file into a Spark DataFrame.
+#' Read a geoparquet file into a Spark DataFrame. The created dataframe is
automatically registered.
+#'
+#' @param sc A \code{spark_connection}.
+#' @param location Location of the data source.
+#' @param name The name to assign to the newly generated table.
+#'
+#'
+#' @return A tbl
+#'
+#' @examples
+#' library(sparklyr)
+#' library(apache.sedona)
+#'
+#' sc <- spark_connect(master = "spark://HOST:PORT")
+#'
+#' if (!inherits(sc, "test_connection")) {
+#' input_location <- "/dev/null" # replace it with the path to your input
file
+#' rdd <- sedona_read_geoparquet(sc, location = input_location)
+#' }
+#'
+#' @family Sedona data interface functions
+#'
+#' @export
+sedona_read_geoparquet <- function(sc,
+ location,
+ name = NULL) {
+
+ ## don't have sparklyr's `%<-%`' maybe to replicate later
+ checked <- sparklyr:::spark_read_compat_param(sc, name, location)
+ name <- checked[[1]]
+ path <- checked[[2]]
+
+ df <-
+ invoke(
+ spark_session(sc),
+ "%>%",
+ list("read"), list("format", "geoparquet"), list("load",
sparklyr:::spark_normalize_path(path)))
+
+
+
+ sdf_register(df, name = name)
+}
+
+
+# ------- Write ------------
+
+
#' Write SpatialRDD into a file.
#'
#' Export serialized data from a Sedona SpatialRDD into an output file.
@@ -589,6 +641,53 @@ sedona_save_spatial_rdd <- function(x,
)
}
+
+#' Save a Spark dataframe into a geoparquet file.
+#'
+#' Export spatial from a Spark dataframe into a geoparquet file
+#'
+#' @param x A Spark dataframe object in sparklyr or a dplyr expression
+#' representing a Spark SQL query.
+#' @param output_location Location of the output file.
+#'
+#'
+#' @return NULL
+#'
+#' @examples
+#' library(sparklyr)
+#' library(apache.sedona)
+#'
+#' sc <- spark_connect(master = "spark://HOST:PORT")
+#'
+#' if (!inherits(sc, "test_connection")) {
+#' tbl <- dplyr::tbl(
+#' sc,
+#' dplyr::sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS
`pt`")
+#' )
+#' sedona_write_geoparquet(
+#' tbl %>% dplyr::mutate(id = 1),
+#' output_location = "/tmp/pts.geoparquet"
+#' )
+#' }
+#'
+#' @family Sedona data interface functions
+#'
+#' @importFrom sparklyr spark_dataframe
+#' @export
+sedona_write_geoparquet <- function(x,
+ output_location) {
+
+ ## Get back jobj
+ x_obj <- spark_dataframe(x)
+
+ invoke(
+ x_obj,
+ "%>%",
+ list("write"), list("format", "geoparquet"), list("save",
sparklyr:::spark_normalize_path(output_location)))
+
+}
+
+# ------- Utilities ------------
rdd_cls_from_type <- function(type = c("point", "polygon", "linestring")) {
type <- match.arg(type)
diff --git a/R/R/sdf_interface.R b/R/R/sdf_interface.R
index 44bf751c..c0a5d5f7 100644
--- a/R/R/sdf_interface.R
+++ b/R/R/sdf_interface.R
@@ -60,7 +60,7 @@ sdf_register.spatial_rdd <- function(x, name = NULL) {
#'
#' @inheritParams as_spark_dataframe
#' @param non_spatial_cols Column names for non-spatial attributes in the
-#' resulting Spark Dataframe.
+#' resulting Spark Dataframe. By default (NULL) it will import all field
names if that property exists, in particular for shapefiles.
#'
#' @return A Spark Dataframe containing the imported spatial data.
#'
@@ -86,6 +86,18 @@ sdf_register.spatial_rdd <- function(x, name = NULL) {
#' @export
as.spark.dataframe <- function(x, non_spatial_cols = NULL, name = NULL) {
sc <- spark_connection(x$.jobj)
+
+ # Defaut keep all columns
+ if (is.null(non_spatial_cols)) {
+ if (!is.null(invoke(x$.jobj, "%>%", list("fieldNames")))) { ## Only if
dataset has field names
+ non_spatial_cols <- invoke(x$.jobj, "%>%", list("fieldNames"),
list("toString")) ### Get columns names
+ non_spatial_cols <- gsub("(^\\[|\\]$)", "", non_spatial_cols) #####
remove brackets
+ non_spatial_cols <- strsplit(non_spatial_cols, ", ")[[1]] ##### turn
into list
+ }
+ } else {
+ stopifnot("non_spatial_cols needs to be a charcter vector (or NULL,
default)" = is.character(non_spatial_cols))
+ }
+
sdf <- invoke_static(
sc,
"org.apache.sedona.sql.utils.Adapter",
diff --git a/R/tests/testthat/test-data-interface.R
b/R/tests/testthat/test-data-interface.R
index d6a58bef..f10950de 100644
--- a/R/tests/testthat/test-data-interface.R
+++ b/R/tests/testthat/test-data-interface.R
@@ -23,6 +23,10 @@ shapefile <- function(filename) {
test_data(file.path("shapefiles", filename))
}
+geoparquet <- function(filename) {
+ test_data(file.path("geoparquet", filename))
+}
+
test_rdd_with_non_spatial_attrs <- invoke_new(
sc,
"org.apache.sedona.core.spatialRDD.PointRDD",
@@ -75,7 +79,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates PointRDD
correctly", {
first_spatial_col_index = 1,
has_non_spatial_attrs = TRUE
)
-
+
expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
expect_equal(pt_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
expect_boundary_envelope(pt_rdd, c(-173.120769, -84.965961, 30.244859,
71.355134))
@@ -103,7 +107,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates
PolygonRDD correctly", {
first_spatial_col_index = 0,
has_non_spatial_attrs = FALSE
)
-
+
expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
expect_equal(polygon_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
expect_boundary_envelope(polygon_rdd, c(-158.104182, -66.03575, 17.986328,
48.645133))
@@ -118,7 +122,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates
LineStringRDD correctly", {
first_spatial_col_index = 0,
has_non_spatial_attrs = FALSE
)
-
+
expect_equal(class(linestring_rdd), c("linestring_rdd", "spatial_rdd"))
expect_equal(linestring_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
expect_boundary_envelope(linestring_rdd, c(-123.393766, -65.648659,
17.982169, 49.002374))
@@ -130,7 +134,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates
PointRDD correctly", {
location = test_data("points.json"),
type = "point"
)
-
+
expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
expect_equal(pt_rdd$.jobj %>% invoke("approximateTotalCount"), 7)
expect_boundary_envelope(pt_rdd, c(-88.1234, -85.3333, 31.3699, 34.9876))
@@ -156,7 +160,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates
PolygonRDD correctly", {
type = "polygon",
has_non_spatial_attrs = TRUE
)
-
+
expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
expect_equal(polygon_rdd$.jobj %>% invoke("approximateTotalCount"), 1001)
expect_false(is.null(polygon_rdd$.jobj %>% invoke("boundaryEnvelope")))
@@ -182,7 +186,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates
PolygonRDD correctly", {
test_that("sedona_read_geojson() works as expected on geojson input with
'type' and 'geometry' properties", {
geojson_rdd <- sedona_read_geojson(sc, test_data("testPolygon.json"))
-
+
expect_equal(
geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
1001
)
@@ -190,7 +194,7 @@ test_that("sedona_read_geojson() works as expected on
geojson input with 'type'
test_that("sedona_read_geojson() works as expected on geojson input without
'type' or 'geometry' properties", {
geojson_rdd <- sedona_read_geojson(sc,
test_data("testpolygon-no-property.json"))
-
+
expect_equal(
geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
10
)
@@ -198,7 +202,7 @@ test_that("sedona_read_geojson() works as expected on
geojson input without 'typ
test_that("sedona_read_geojson() works as expected on geojson input with null
property value", {
geojson_rdd <- sedona_read_geojson(sc,
test_data("testpolygon-with-null-property-value.json"))
-
+
expect_equal(
geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
3
)
@@ -211,18 +215,18 @@ test_that("sedona_read_geojson() can skip invalid
geometries correctly", {
allow_invalid_geometries = TRUE,
skip_syntactically_invalid_geometries = FALSE
)
-
+
expect_equal(
geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
3
)
-
+
geojson_rdd <- sedona_read_geojson(
sc,
test_data("testInvalidPolygon.json"),
allow_invalid_geometries = FALSE,
skip_syntactically_invalid_geometries = FALSE
)
-
+
expect_equal(
geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
2
)
@@ -230,7 +234,7 @@ test_that("sedona_read_geojson() can skip invalid
geometries correctly", {
test_that("sedona_read_wkb() works as expected", {
wkb_rdd <- sedona_read_wkb(sc, test_data("county_small_wkb.tsv"))
-
+
expect_equal(
wkb_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 103
)
@@ -241,7 +245,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates
PointRDD correctly", {
sc,
location = shapefile("point"), type = "point"
)
-
+
expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
expect_equal(
pt_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
@@ -254,7 +258,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates
PolygonRDD correctly", {
sc,
location = shapefile("polygon"), type = "polygon"
)
-
+
expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
expect_equal(
polygon_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
@@ -267,7 +271,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates
LineStringRDD correctly"
sc,
location = shapefile("polyline"), type = "linestring"
)
-
+
expect_equal(class(linestring_rdd), c("linestring_rdd", "spatial_rdd"))
expect_equal(
linestring_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"),
list("count")),
@@ -277,12 +281,97 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates
LineStringRDD correctly"
test_that("sedona_read_shapefile() works as expected", {
wkb_rdd <- sedona_read_shapefile(sc, shapefile("polygon"))
-
+
expect_equal(
wkb_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
10000
)
})
+
+#### TESTS TO WRITE: writing
+
+test_that("sedona_read_geoparquet() works as expected", {
+ sdf_name <- random_string("spatial_sdf")
+ geoparquet_sdf <- sedona_read_geoparquet(sc, geoparquet("example1.parquet"),
name = sdf_name)
+
+ ## Right number of rows
+ geoparquet_df <-
+ geoparquet_sdf %>%
+ sparklyr:::spark_sqlresult_from_dplyr()
+
+ expect_equal(
+ invoke(geoparquet_df, 'count'), 5
+ )
+
+ ## Right registered name
+ expect_equal(geoparquet_sdf %>% dbplyr::remote_name(),
dbplyr::ident(sdf_name))
+
+ ## Right schema
+ expect_equivalent(
+ geoparquet_sdf %>% sdf_schema(),
+ list(
+ pop_est = list(name = "pop_est", type = "LongType"),
+ continent = list(name = "continent", type = "StringType"),
+ name = list(name = "name", type = "StringType"),
+ iso_a3 = list(name = "iso_a3", type = "StringType"),
+ gdp_md_est = list(name = "gdp_md_est", type = "DoubleType"),
+ geometry = list(name = "geometry", type = "GeometryUDT")
+ )
+ )
+
+ ## Right data (first row)
+ expect_equivalent(
+ geoparquet_sdf %>% head(1) %>% mutate(geometry = geometry %>% st_astext())
%>% collect() %>% as.list(),
+ list(
+ pop_est = 920938,
+ continent = "Oceania",
+ name = "Fiji",
+ iso_a3 = "FJI",
+ gdp_md_est = 8374,
+ geometry = "MULTIPOLYGON (((180 -16.067132663642447, 180
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400000000 [...]
+ )
+ )
+
+ ## Spatial predicate
+ filtered <-
+ geoparquet_sdf %>%
+ filter(ST_Intersects(ST_Point(35.174722, -6.552465), geometry)) %>%
+ collect()
+ expect_equal(filtered %>% nrow(), 1)
+ expect_equal(filtered$name, "Tanzania")
+
+})
+
+
+test_that("sedona_write_geoparquet() works as expected", {
+ geoparquet_sdf <- sedona_read_geoparquet(sc, geoparquet("example2.parquet"))
+ tmp_dest <- tempfile(fileext = ".parquet")
+
+ ## Save
+ geoparquet_sdf %>% sedona_write_geoparquet(tmp_dest)
+
+ ### Reload
+ geoparquet_2_sdf <- sedona_read_geoparquet(sc, tmp_dest)
+
+ expect_equivalent(
+ geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>%
collect(),
+ geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>%
collect()
+ )
+
+ unlink(tmp_dest, recursive = TRUE)
+
+})
+
+
+test_that("sedona_read_geoparquet() throws an error with plain parquet files",
{
+
+ expect_error(
+ sedona_read_geoparquet(sc, geoparquet("plain.parquet")),
+ regexp = "GeoParquet file does not contain valid geo metadata"
+ )
+
+})
+
test_that("sedona_write_wkb() works as expected", {
output_location <- tempfile()
sedona_write_wkb(test_rdd_with_non_spatial_attrs, output_location)
@@ -297,7 +386,7 @@ test_that("sedona_write_wkb() works as expected", {
1L, # numPartitions
sc$state$object_cache$storage_levels$memory_only
)
-
+
expect_result_matches_original(pt_rdd)
})
@@ -315,7 +404,7 @@ test_that("sedona_write_wkt() works as expected", {
1L, # numPartitions
sc$state$object_cache$storage_levels$memory_only
)
-
+
expect_result_matches_original(pt_rdd)
})
@@ -333,7 +422,7 @@ test_that("sedona_write_geojson() works as expected", {
1L, # numPartitions
sc$state$object_cache$storage_levels$memory_only
)
-
+
expect_result_matches_original_geojson(pt_rdd)
})
@@ -352,7 +441,7 @@ test_that("sedona_save_spatial_rdd() works as expected", {
spatial_col = "pt", output_location = location, output_format = fmt
)
sdf <- do.call(paste0("sedona_read_", fmt), list(sc, location))
-
+
expect_equal(
sdf$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 1
)
diff --git a/R/tests/testthat/test-sdf-interface.R
b/R/tests/testthat/test-sdf-interface.R
index 9131a5cb..8d6dd66a 100644
--- a/R/tests/testthat/test-sdf-interface.R
+++ b/R/tests/testthat/test-sdf-interface.R
@@ -21,20 +21,60 @@ sc <- testthat_spark_connection()
pt_rdd <- read_point_rdd_with_non_spatial_attrs()
+shapefile <- function(filename) {
+ test_data(file.path("shapefiles", filename))
+}
+
test_that("sdf_register() works as expected for Spatial RDDs", {
sdf_name <- random_string("spatial_sdf")
pt_sdf <- sdf_register(pt_rdd, name = sdf_name)
-
+
expect_equivalent(
pt_sdf %>% sdf_schema(),
- list(geometry = list(name = "geometry", type = "GeometryUDT"))
+ list(
+ geometry = list(name = "geometry", type = "GeometryUDT")
+
+ )
)
expect_equal(pt_sdf %>% dbplyr::remote_name(), dbplyr::ident(sdf_name))
-
+
pt_sdf %>% collect()
succeed()
})
+
+test_that("sdf_register() works as expected for Spatial RDDs with fieldNames",
{
+ sdf_name <- random_string("spatial_sdf")
+ polygon_rdd <- sedona_read_shapefile_to_typed_rdd(
+ sc,
+ location = shapefile("dbf"), type = "polygon"
+ )
+ polygon_sdf <- sdf_register(polygon_rdd, name = sdf_name)
+
+ expect_equivalent(
+ polygon_sdf %>% sdf_schema(),
+ list(
+ geometry = list(name = "geometry", type = "GeometryUDT"),
+ geometry = list(name = "STATEFP", type = "StringType"),
+ geometry = list(name = "COUNTYFP", type = "StringType"),
+ geometry = list(name = "COUNTYNS", type = "StringType"),
+ geometry = list(name = "AFFGEOID", type = "StringType"),
+ geometry = list(name = "GEOID", type = "StringType"),
+ geometry = list(name = "NAME", type = "StringType"),
+ geometry = list(name = "LSAD", type = "StringType"),
+ geometry = list(name = "ALAND", type = "StringType"),
+ geometry = list(name = "AWATER", type = "StringType")
+
+ )
+ )
+
+ expect_equal(polygon_sdf %>% dbplyr::remote_name(), dbplyr::ident(sdf_name))
+
+ polygon_sdf %>% collect()
+ succeed()
+})
+
+
test_that("as.spark.dataframe() works as expected for Spatial RDDs with
non-spatial attributes", {
sdf_name <- random_string("spatial_sdf")
pt_sdf <- as.spark.dataframe(