This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d5e9c5801cb [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace d5e9c5801cb is described below commit d5e9c5801cb1d0c8cb545b679261bd94b5ae0280 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Jul 11 14:23:12 2022 +0800 [SPARK-39719][R] Implement databaseExists/getDatabase in SparkR support 3L namespace ### What changes were proposed in this pull request? 1, add `databaseExists`/`getDatabase` 2, make sure `listTables` support 3L namespace 3, modify sparkR-specific catalog method `tables` and `tableNames` to support 3L namespace ### Why are the changes needed? to support 3L namespace in SparkR ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? updated UT and manual check Closes #37132 from zhengruifeng/r_3L_dbname. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- R/pkg/NAMESPACE | 2 + R/pkg/R/catalog.R | 72 +++++++++++++++++++++- R/pkg/pkgdown/_pkgdown_template.yml | 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 34 +++++++++- .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- 5 files changed, 107 insertions(+), 5 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index f5f60ecf134..3937791421a 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -476,8 +476,10 @@ export("as.DataFrame", "createTable", "currentCatalog", "currentDatabase", + "databaseExists", "dropTempTable", "dropTempView", + "getDatabase", "getTable", "listCatalogs", "listColumns", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 8237ac26b33..680415ea6cd 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -278,13 +278,14 @@ dropTempView <- function(viewName) { #' Returns a SparkDataFrame containing names of tables in the given database. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame #' @rdname tables #' @seealso \link{listTables} #' @examples #'\dontrun{ #' sparkR.session() -#' tables("hive") +#' tables("spark_catalog.hive") #' } #' @name tables #' @note tables since 1.4.0 @@ -298,12 +299,13 @@ tables <- function(databaseName = NULL) { #' Returns the names of tables in the given database as an array. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a list of table names #' @rdname tableNames #' @examples #'\dontrun{ #' sparkR.session() -#' tableNames("hive") +#' tableNames("spark_catalog.hive") #' } #' @name tableNames #' @note tableNames since 1.4.0 @@ -356,6 +358,28 @@ setCurrentDatabase <- function(databaseName) { invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName)) } +#' Checks if the database with the specified name exists. +#' +#' Checks if the database with the specified name exists. +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @rdname databaseExists +#' @name databaseExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default") +#' } +#' @note since 3.4.0 +databaseExists <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "databaseExists", databaseName) +} + #' Returns a list of databases available #' #' Returns a list of databases available. @@ -375,12 +399,54 @@ listDatabases <- function() { dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF")) } +#' Get the database with the specified name +#' +#' Get the database with the specified name +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @return A named list. +#' @rdname getDatabase +#' @name getDatabase +#' @examples +#' \dontrun{ +#' sparkR.session() +#' db <- getDatabase("default") +#' } +#' @note since 3.4.0 +getDatabase <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jdb <- handledCallJMethod(catalog, "getDatabase", databaseName) + + ret <- list(name = callJMethod(jdb, "name")) + jcata <- callJMethod(jdb, "catalog") + if (is.null(jcata)) { + ret$catalog <- NA + } else { + ret$catalog <- jcata + } + + jdesc <- callJMethod(jdb, "description") + if (is.null(jdesc)) { + ret$description <- NA + } else { + ret$description <- jdesc + } + + ret$locationUri <- callJMethod(jdb, "locationUri") + ret +} + #' Returns a list of tables or views in the specified database #' #' Returns a list of tables or views in the specified database. #' This includes all temporary views. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame of the list of tables. #' @rdname listTables #' @name listTables @@ -389,7 +455,7 @@ listDatabases <- function() { #' \dontrun{ #' sparkR.session() #' listTables() -#' listTables("default") +#' listTables("spark_catalog.default") #' } #' @note since 2.2.0 listTables <- function(databaseName = NULL) { diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml index a9107c1293e..df93f200ab2 100644 --- a/R/pkg/pkgdown/_pkgdown_template.yml +++ b/R/pkg/pkgdown/_pkgdown_template.yml @@ -263,8 +263,10 @@ reference: - contents: - currentCatalog - currentDatabase + - databaseExists - dropTempTable - dropTempView + - getDatabase - getTable - listCatalogs - listColumns diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 29a6c2580e4..85eca6b510b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -673,6 +673,22 @@ test_that("test tableNames and tables", { tables <- listTables() expect_equal(count(tables), count + 0) + + count2 <- count(listTables()) + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("people", source = "json", schema = schema) + + expect_equal(length(tableNames()), count2 + 1) + expect_equal(length(tableNames("default")), count2 + 1) + expect_equal(length(tableNames("spark_catalog.default")), count2 + 1) + + tables <- listTables() + expect_equal(count(tables), count2 + 1) + expect_equal(count(tables()), count(tables)) + expect_equal(count(tables("default")), count2 + 1) + expect_equal(count(tables("spark_catalog.default")), count2 + 1) + sql("DROP TABLE IF EXISTS people") }) test_that( @@ -3422,6 +3438,8 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", { "Length of type vector should match the number of columns for SparkDataFrame") expect_error(coltypes(df) <- c("environment", "list"), "Only atomic type is supported for column types") + + dropTempView("dfView") }) test_that("Method str()", { @@ -3461,6 +3479,8 @@ test_that("Method str()", { # Test utils:::str expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris))) + + dropTempView("irisView") }) test_that("Histogram", { @@ -4033,20 +4053,32 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", { catalogs <- collect(listCatalogs()) }) -test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { +test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, getDatabase", { expect_equal(currentDatabase(), "default") expect_error(setCurrentDatabase("default"), NA) expect_error(setCurrentDatabase("zxwtyswklpf"), paste0("Error in setCurrentDatabase : no such database - Database ", "'zxwtyswklpf' not found")) + + expect_true(databaseExists("default")) + expect_true(databaseExists("spark_catalog.default")) + expect_false(databaseExists("some_db")) + expect_false(databaseExists("spark_catalog.some_db")) + dbs <- collect(listDatabases()) expect_equal(names(dbs), c("name", "catalog", "description", "locationUri")) expect_equal(which(dbs[, 1] == "default"), 1) + + db <- getDatabase("spark_catalog.default") + expect_equal(db$name, "default") + expect_equal(db$catalog, "spark_catalog") }) test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", { tb <- listTables() count <- count(tables()) + expect_equal(nrow(listTables("default")), count) + expect_equal(nrow(listTables("spark_catalog.default")), count) expect_equal(nrow(tb), count) expect_equal(colnames(tb), c("name", "catalog", "namespace", "description", "tableType", "isTemporary")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index f58afcfa05d..f505f55c259 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -217,7 +217,7 @@ private[sql] object SQLUtils extends Logging { case _ => sparkSession.catalog.currentDatabase } - sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray + sparkSession.catalog.listTables(db).collect().map(_.name) } def createArrayType(column: Column): ArrayType = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org