This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new b3ce9012 [SEDONA-243] R features: read/write geoparquet, get names 
from shapefiles (#770)
b3ce9012 is described below

commit b3ce9012d6a87b89d68227d9c56e42e65e45f651
Author: gregleleu <[email protected]>
AuthorDate: Thu Feb 16 19:07:11 2023 -0500

    [SEDONA-243] R features: read/write geoparquet, get names from shapefiles 
(#770)
---
 R/NAMESPACE                            |   3 +
 R/R/data_interface.R                   |  99 +++++++++++++++++++++++++
 R/R/sdf_interface.R                    |  14 +++-
 R/tests/testthat/test-data-interface.R | 129 ++++++++++++++++++++++++++++-----
 R/tests/testthat/test-sdf-interface.R  |  46 +++++++++++-
 5 files changed, 267 insertions(+), 24 deletions(-)

diff --git a/R/NAMESPACE b/R/NAMESPACE
index 5edffa04..f384a196 100644
--- a/R/NAMESPACE
+++ b/R/NAMESPACE
@@ -13,6 +13,7 @@ export(sedona_range_query)
 export(sedona_read_dsv_to_typed_rdd)
 export(sedona_read_geojson)
 export(sedona_read_geojson_to_typed_rdd)
+export(sedona_read_geoparquet)
 export(sedona_read_shapefile)
 export(sedona_read_shapefile_to_typed_rdd)
 export(sedona_read_wkb)
@@ -24,9 +25,11 @@ export(sedona_save_spatial_rdd)
 export(sedona_spatial_join)
 export(sedona_spatial_join_count_by_key)
 export(sedona_write_geojson)
+export(sedona_write_geoparquet)
 export(sedona_write_wkb)
 export(sedona_write_wkt)
 export(to_spatial_rdd)
 import(sparklyr)
 importFrom(rlang,"%||%")
 importFrom(sparklyr,sdf_register)
+importFrom(sparklyr,spark_dataframe)
diff --git a/R/R/data_interface.R b/R/R/data_interface.R
index 436da7bb..0f03cdd8 100644
--- a/R/R/data_interface.R
+++ b/R/R/data_interface.R
@@ -15,6 +15,9 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+
+# ------- Read ------------
+
 #' Create a SpatialRDD from an external data source.
 #' Import spatial object from an external data source into a Sedona SpatialRDD.
 #'
@@ -439,6 +442,55 @@ sedona_read_shapefile <- function(sc,
     new_spatial_rdd(NULL)
 }
 
+
+#' Read a geoparquet file into a Spark DataFrame.
+#' Read a geoparquet file into a Spark DataFrame. The created dataframe is 
automatically registered.
+#'
+#' @param sc A \code{spark_connection}.
+#' @param location Location of the data source.
+#' @param name The name to assign to the newly generated table.
+#'
+#'
+#' @return A tbl
+#'
+#' @examples
+#' library(sparklyr)
+#' library(apache.sedona)
+#'
+#' sc <- spark_connect(master = "spark://HOST:PORT")
+#'
+#' if (!inherits(sc, "test_connection")) {
+#'   input_location <- "/dev/null" # replace it with the path to your input 
file
+#'   rdd <- sedona_read_geoparquet(sc, location = input_location)
+#' }
+#'
+#' @family Sedona data interface functions
+#'
+#' @export
+sedona_read_geoparquet <- function(sc,
+                                   location,
+                                   name = NULL) {
+  
+  ## don't have sparklyr's `%<-%`' maybe to replicate later
+  checked <- sparklyr:::spark_read_compat_param(sc, name, location)
+  name <- checked[[1]]
+  path <- checked[[2]]
+  
+  df <- 
+    invoke(
+      spark_session(sc), 
+      "%>%", 
+      list("read"), list("format", "geoparquet"), list("load", 
sparklyr:::spark_normalize_path(path))) 
+  
+  
+  
+  sdf_register(df, name = name)
+}
+
+
+# ------- Write ------------
+
+
 #' Write SpatialRDD into a file.
 #'
 #' Export serialized data from a Sedona SpatialRDD into an output file.
@@ -589,6 +641,53 @@ sedona_save_spatial_rdd <- function(x,
   )
 }
 
+
+#' Save a Spark dataframe into a geoparquet file.
+#'
+#' Export spatial from a Spark dataframe into a geoparquet file
+#'
+#' @param x A Spark dataframe object in sparklyr or a dplyr expression
+#'   representing a Spark SQL query.
+#' @param output_location Location of the output file.
+#'
+#'
+#' @return NULL
+#'
+#' @examples
+#' library(sparklyr)
+#' library(apache.sedona)
+#'
+#' sc <- spark_connect(master = "spark://HOST:PORT")
+#'
+#' if (!inherits(sc, "test_connection")) {
+#'   tbl <- dplyr::tbl(
+#'     sc,
+#'     dplyr::sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS 
`pt`")
+#'   )
+#'   sedona_write_geoparquet(
+#'     tbl %>% dplyr::mutate(id = 1),
+#'     output_location = "/tmp/pts.geoparquet"
+#'   )
+#' }
+#'
+#' @family Sedona data interface functions
+#'
+#' @importFrom sparklyr spark_dataframe
+#' @export
+sedona_write_geoparquet <- function(x,
+                                   output_location) {
+  
+  ## Get back jobj
+  x_obj <- spark_dataframe(x)
+  
+  invoke(
+    x_obj, 
+    "%>%", 
+    list("write"), list("format", "geoparquet"), list("save", 
sparklyr:::spark_normalize_path(output_location))) 
+
+}
+
+# ------- Utilities ------------
 rdd_cls_from_type <- function(type = c("point", "polygon", "linestring")) {
   type <- match.arg(type)
 
diff --git a/R/R/sdf_interface.R b/R/R/sdf_interface.R
index 44bf751c..c0a5d5f7 100644
--- a/R/R/sdf_interface.R
+++ b/R/R/sdf_interface.R
@@ -60,7 +60,7 @@ sdf_register.spatial_rdd <- function(x, name = NULL) {
 #'
 #' @inheritParams as_spark_dataframe
 #' @param non_spatial_cols Column names for non-spatial attributes in the
-#'   resulting Spark Dataframe.
+#'   resulting Spark Dataframe. By default (NULL) it will import all field 
names if that property exists, in particular for shapefiles.
 #'
 #' @return A Spark Dataframe containing the imported spatial data.
 #'
@@ -86,6 +86,18 @@ sdf_register.spatial_rdd <- function(x, name = NULL) {
 #' @export
 as.spark.dataframe <- function(x, non_spatial_cols = NULL, name = NULL) {
   sc <- spark_connection(x$.jobj)
+  
+  # Defaut keep all columns
+  if (is.null(non_spatial_cols)) {
+    if (!is.null(invoke(x$.jobj, "%>%", list("fieldNames")))) { ## Only if 
dataset has field names
+      non_spatial_cols <- invoke(x$.jobj, "%>%", list("fieldNames"), 
list("toString")) ### Get columns names
+      non_spatial_cols <- gsub("(^\\[|\\]$)", "", non_spatial_cols)  ##### 
remove brackets
+      non_spatial_cols <- strsplit(non_spatial_cols, ", ")[[1]]  ##### turn 
into list
+    }
+  } else {
+    stopifnot("non_spatial_cols needs to be a charcter vector (or NULL, 
default)" = is.character(non_spatial_cols))
+  }
+  
   sdf <- invoke_static(
     sc,
     "org.apache.sedona.sql.utils.Adapter",
diff --git a/R/tests/testthat/test-data-interface.R 
b/R/tests/testthat/test-data-interface.R
index d6a58bef..f10950de 100644
--- a/R/tests/testthat/test-data-interface.R
+++ b/R/tests/testthat/test-data-interface.R
@@ -23,6 +23,10 @@ shapefile <- function(filename) {
   test_data(file.path("shapefiles", filename))
 }
 
+geoparquet <- function(filename) {
+  test_data(file.path("geoparquet", filename))
+}
+
 test_rdd_with_non_spatial_attrs <- invoke_new(
   sc,
   "org.apache.sedona.core.spatialRDD.PointRDD",
@@ -75,7 +79,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates PointRDD 
correctly", {
     first_spatial_col_index = 1,
     has_non_spatial_attrs = TRUE
   )
-
+  
   expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
   expect_equal(pt_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
   expect_boundary_envelope(pt_rdd, c(-173.120769, -84.965961, 30.244859, 
71.355134))
@@ -103,7 +107,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates 
PolygonRDD correctly", {
     first_spatial_col_index = 0,
     has_non_spatial_attrs = FALSE
   )
-
+  
   expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
   expect_equal(polygon_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
   expect_boundary_envelope(polygon_rdd, c(-158.104182, -66.03575, 17.986328, 
48.645133))
@@ -118,7 +122,7 @@ test_that("sedona_read_dsv_to_typed_rdd() creates 
LineStringRDD correctly", {
     first_spatial_col_index = 0,
     has_non_spatial_attrs = FALSE
   )
-
+  
   expect_equal(class(linestring_rdd), c("linestring_rdd", "spatial_rdd"))
   expect_equal(linestring_rdd$.jobj %>% invoke("approximateTotalCount"), 3000)
   expect_boundary_envelope(linestring_rdd, c(-123.393766, -65.648659, 
17.982169, 49.002374))
@@ -130,7 +134,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates 
PointRDD correctly", {
     location = test_data("points.json"),
     type = "point"
   )
-
+  
   expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
   expect_equal(pt_rdd$.jobj %>% invoke("approximateTotalCount"), 7)
   expect_boundary_envelope(pt_rdd, c(-88.1234, -85.3333, 31.3699, 34.9876))
@@ -156,7 +160,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates 
PolygonRDD correctly", {
     type = "polygon",
     has_non_spatial_attrs = TRUE
   )
-
+  
   expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
   expect_equal(polygon_rdd$.jobj %>% invoke("approximateTotalCount"), 1001)
   expect_false(is.null(polygon_rdd$.jobj %>% invoke("boundaryEnvelope")))
@@ -182,7 +186,7 @@ test_that("sedona_read_geojson_to_typed_rdd() creates 
PolygonRDD correctly", {
 
 test_that("sedona_read_geojson() works as expected on geojson input with 
'type' and 'geometry' properties", {
   geojson_rdd <- sedona_read_geojson(sc, test_data("testPolygon.json"))
-
+  
   expect_equal(
     geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
1001
   )
@@ -190,7 +194,7 @@ test_that("sedona_read_geojson() works as expected on 
geojson input with 'type'
 
 test_that("sedona_read_geojson() works as expected on geojson input without 
'type' or 'geometry' properties", {
   geojson_rdd <- sedona_read_geojson(sc, 
test_data("testpolygon-no-property.json"))
-
+  
   expect_equal(
     geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
10
   )
@@ -198,7 +202,7 @@ test_that("sedona_read_geojson() works as expected on 
geojson input without 'typ
 
 test_that("sedona_read_geojson() works as expected on geojson input with null 
property value", {
   geojson_rdd <- sedona_read_geojson(sc, 
test_data("testpolygon-with-null-property-value.json"))
-
+  
   expect_equal(
     geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
3
   )
@@ -211,18 +215,18 @@ test_that("sedona_read_geojson() can skip invalid 
geometries correctly", {
     allow_invalid_geometries = TRUE,
     skip_syntactically_invalid_geometries = FALSE
   )
-
+  
   expect_equal(
     geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
3
   )
-
+  
   geojson_rdd <- sedona_read_geojson(
     sc,
     test_data("testInvalidPolygon.json"),
     allow_invalid_geometries = FALSE,
     skip_syntactically_invalid_geometries = FALSE
   )
-
+  
   expect_equal(
     geojson_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
2
   )
@@ -230,7 +234,7 @@ test_that("sedona_read_geojson() can skip invalid 
geometries correctly", {
 
 test_that("sedona_read_wkb() works as expected", {
   wkb_rdd <- sedona_read_wkb(sc, test_data("county_small_wkb.tsv"))
-
+  
   expect_equal(
     wkb_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 103
   )
@@ -241,7 +245,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates 
PointRDD correctly", {
     sc,
     location = shapefile("point"), type = "point"
   )
-
+  
   expect_equal(class(pt_rdd), c("point_rdd", "spatial_rdd"))
   expect_equal(
     pt_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
@@ -254,7 +258,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates 
PolygonRDD correctly", {
     sc,
     location = shapefile("polygon"), type = "polygon"
   )
-
+  
   expect_equal(class(polygon_rdd), c("polygon_rdd", "spatial_rdd"))
   expect_equal(
     polygon_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")),
@@ -267,7 +271,7 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates 
LineStringRDD correctly"
     sc,
     location = shapefile("polyline"), type = "linestring"
   )
-
+  
   expect_equal(class(linestring_rdd), c("linestring_rdd", "spatial_rdd"))
   expect_equal(
     linestring_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), 
list("count")),
@@ -277,12 +281,97 @@ test_that("sedona_read_shapefile_to_typed_rdd() creates 
LineStringRDD correctly"
 
 test_that("sedona_read_shapefile() works as expected", {
   wkb_rdd <- sedona_read_shapefile(sc, shapefile("polygon"))
-
+  
   expect_equal(
     wkb_rdd$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 
10000
   )
 })
 
+
+#### TESTS TO WRITE: writing
+
+test_that("sedona_read_geoparquet() works as expected", {
+  sdf_name <- random_string("spatial_sdf")
+  geoparquet_sdf <- sedona_read_geoparquet(sc, geoparquet("example1.parquet"), 
name = sdf_name)
+  
+  ## Right number of rows
+  geoparquet_df <-
+    geoparquet_sdf %>% 
+    sparklyr:::spark_sqlresult_from_dplyr()
+
+  expect_equal(
+    invoke(geoparquet_df, 'count'), 5
+  )
+  
+  ## Right registered name
+  expect_equal(geoparquet_sdf %>% dbplyr::remote_name(), 
dbplyr::ident(sdf_name))
+  
+  ## Right schema
+  expect_equivalent(
+    geoparquet_sdf %>% sdf_schema(),
+    list(
+      pop_est    = list(name = "pop_est", type = "LongType"), 
+      continent  = list(name = "continent", type = "StringType"), 
+      name       = list(name = "name", type = "StringType"), 
+      iso_a3     = list(name = "iso_a3", type = "StringType"), 
+      gdp_md_est = list(name = "gdp_md_est", type = "DoubleType"), 
+      geometry   = list(name = "geometry", type = "GeometryUDT")
+    )
+  )
+  
+  ## Right data (first row)
+  expect_equivalent(
+    geoparquet_sdf %>% head(1) %>% mutate(geometry = geometry %>% st_astext()) 
%>% collect() %>% as.list(),
+    list(
+      pop_est = 920938,
+      continent = "Oceania",
+      name = "Fiji",
+      iso_a3 = "FJI",
+      gdp_md_est = 8374,
+      geometry = "MULTIPOLYGON (((180 -16.067132663642447, 180 
-16.555216566639196, 179.36414266196414 -16.801354076946883, 178.72505936299711 
-17.01204167436804, 178.59683859511713 -16.639150000000004, 179.0966093629971 
-16.433984277547403, 179.4135093629971 -16.379054277547404, 180 
-16.067132663642447)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806 
-17.62846, 178.55271 -18.15059, 177.93266000000003 -18.28799, 177.38146 
-18.16432, 177.28504 -17.72465, 177.67087 -17.3811400000000 [...]
+    )
+  )
+ 
+   ## Spatial predicate
+  filtered <- 
+    geoparquet_sdf %>% 
+    filter(ST_Intersects(ST_Point(35.174722, -6.552465), geometry)) %>% 
+    collect()
+  expect_equal(filtered %>% nrow(), 1)
+  expect_equal(filtered$name, "Tanzania")
+ 
+})
+
+
+test_that("sedona_write_geoparquet() works as expected", {
+  geoparquet_sdf <- sedona_read_geoparquet(sc, geoparquet("example2.parquet"))
+  tmp_dest <- tempfile(fileext = ".parquet")
+  
+  ## Save
+  geoparquet_sdf %>% sedona_write_geoparquet(tmp_dest)
+  
+  ### Reload
+  geoparquet_2_sdf <- sedona_read_geoparquet(sc, tmp_dest)
+  
+  expect_equivalent(
+    geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% 
collect(),
+    geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% 
collect()
+  )
+  
+  unlink(tmp_dest, recursive = TRUE)
+  
+})
+
+
+test_that("sedona_read_geoparquet() throws an error with plain parquet files", 
{
+
+  expect_error(
+    sedona_read_geoparquet(sc, geoparquet("plain.parquet")),
+    regexp = "GeoParquet file does not contain valid geo metadata"
+    )
+ 
+})
+
 test_that("sedona_write_wkb() works as expected", {
   output_location <- tempfile()
   sedona_write_wkb(test_rdd_with_non_spatial_attrs, output_location)
@@ -297,7 +386,7 @@ test_that("sedona_write_wkb() works as expected", {
     1L, # numPartitions
     sc$state$object_cache$storage_levels$memory_only
   )
-
+  
   expect_result_matches_original(pt_rdd)
 })
 
@@ -315,7 +404,7 @@ test_that("sedona_write_wkt() works as expected", {
     1L, # numPartitions
     sc$state$object_cache$storage_levels$memory_only
   )
-
+  
   expect_result_matches_original(pt_rdd)
 })
 
@@ -333,7 +422,7 @@ test_that("sedona_write_geojson() works as expected", {
     1L, # numPartitions
     sc$state$object_cache$storage_levels$memory_only
   )
-
+  
   expect_result_matches_original_geojson(pt_rdd)
 })
 
@@ -352,7 +441,7 @@ test_that("sedona_save_spatial_rdd() works as expected", {
         spatial_col = "pt", output_location = location, output_format = fmt
       )
     sdf <- do.call(paste0("sedona_read_", fmt), list(sc, location))
-
+    
     expect_equal(
       sdf$.jobj %>% invoke("%>%", list("rawSpatialRDD"), list("count")), 1
     )
diff --git a/R/tests/testthat/test-sdf-interface.R 
b/R/tests/testthat/test-sdf-interface.R
index 9131a5cb..8d6dd66a 100644
--- a/R/tests/testthat/test-sdf-interface.R
+++ b/R/tests/testthat/test-sdf-interface.R
@@ -21,20 +21,60 @@ sc <- testthat_spark_connection()
 
 pt_rdd <- read_point_rdd_with_non_spatial_attrs()
 
+shapefile <- function(filename) {
+  test_data(file.path("shapefiles", filename))
+}
+
 test_that("sdf_register() works as expected for Spatial RDDs", {
   sdf_name <- random_string("spatial_sdf")
   pt_sdf <- sdf_register(pt_rdd, name = sdf_name)
-
+  
   expect_equivalent(
     pt_sdf %>% sdf_schema(),
-    list(geometry = list(name = "geometry", type = "GeometryUDT"))
+    list(
+      geometry = list(name = "geometry", type = "GeometryUDT")
+      
+    )
   )
   expect_equal(pt_sdf %>% dbplyr::remote_name(), dbplyr::ident(sdf_name))
-
+  
   pt_sdf %>% collect()
   succeed()
 })
 
+
+test_that("sdf_register() works as expected for Spatial RDDs with fieldNames", 
{
+  sdf_name <- random_string("spatial_sdf")
+  polygon_rdd <- sedona_read_shapefile_to_typed_rdd(
+    sc,
+    location = shapefile("dbf"), type = "polygon"
+  )
+  polygon_sdf <- sdf_register(polygon_rdd, name = sdf_name)
+  
+  expect_equivalent(
+    polygon_sdf %>% sdf_schema(),
+    list(
+      geometry = list(name = "geometry", type = "GeometryUDT"),
+      geometry = list(name = "STATEFP", type = "StringType"),
+      geometry = list(name = "COUNTYFP", type = "StringType"),
+      geometry = list(name = "COUNTYNS", type = "StringType"),
+      geometry = list(name = "AFFGEOID", type = "StringType"),
+      geometry = list(name = "GEOID", type = "StringType"),
+      geometry = list(name = "NAME", type = "StringType"),
+      geometry = list(name = "LSAD", type = "StringType"),
+      geometry = list(name = "ALAND", type = "StringType"),
+      geometry = list(name = "AWATER", type = "StringType")
+      
+    )
+  )
+  
+  expect_equal(polygon_sdf %>% dbplyr::remote_name(), dbplyr::ident(sdf_name))
+  
+  polygon_sdf %>% collect()
+  succeed()
+})
+
+
 test_that("as.spark.dataframe() works as expected for Spatial RDDs with 
non-spatial attributes", {
   sdf_name <- random_string("spatial_sdf")
   pt_sdf <- as.spark.dataframe(

Reply via email to