This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c2f8f555dd7 [SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace c2f8f555dd7 is described below commit c2f8f555dd7a067625455b66c207cbfd113a8e6e Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Jul 11 12:02:00 2022 +0800 [SPARK-39720][R] Implement tableExists/getTable in SparkR for 3L namespace ### What changes were proposed in this pull request? 1, Implement tableExists/getTable 2, Update the documents of createTable/cacheTable/uncacheTable/refreshTable/recoverPartitions/listColumns ### Why are the changes needed? for 3L namespace ### Does this PR introduce _any_ user-facing change? yes, new method `tableExists` ### How was this patch tested? updated UT Closes #37133 from zhengruifeng/r_3L_tblname. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- R/pkg/NAMESPACE | 2 + R/pkg/R/catalog.R | 87 +++++++++++++++++++++++++++++++++-- R/pkg/pkgdown/_pkgdown_template.yml | 2 + R/pkg/tests/fulltests/test_sparkSQL.R | 46 ++++++++++++++---- 4 files changed, 125 insertions(+), 12 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 570f721ab41..f5f60ecf134 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -478,6 +478,7 @@ export("as.DataFrame", "currentDatabase", "dropTempTable", "dropTempView", + "getTable", "listCatalogs", "listColumns", "listDatabases", @@ -503,6 +504,7 @@ export("as.DataFrame", "spark.getSparkFiles", "sql", "str", + "tableExists", "tableToDF", "tableNames", "tables", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index b10f73fb340..8237ac26b33 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @param path (optional) the path of files to load. #' @param source (optional) the name of the data source. #' @param schema (optional) the schema of the data required for some data sources. @@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' sparkR.session() #' df <- createTable("myjson", path="path/to/json", source="json", schema) #' -#' createTable("people", source = "json", schema = schema) +#' createTable("spark_catalog.default.people", source = "json", schema = schema) #' insertInto(df, "people") #' } #' @name createTable @@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, .. #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname cacheTable #' @examples @@ -184,6 +186,7 @@ cacheTable <- function(tableName) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname uncacheTable #' @examples @@ -403,6 +406,78 @@ listTables <- function(databaseName = NULL) { dataFrame(callJMethod(jdst, "toDF")) } +#' Checks if the table with the specified name exists. +#' +#' Checks if the table with the specified name exists. +#' +#' @param tableName name of the table, allowed to be qualified with catalog name +#' @rdname tableExists +#' @name tableExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default.myTable") +#' } +#' @note since 3.4.0 +tableExists <- function(tableName) { + sparkSession <- getSparkSession() + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "tableExists", tableName) +} + +#' Get the table with the specified name +#' +#' Get the table with the specified name +#' +#' @param tableName the qualified or unqualified name that designates a table, allowed to be +#' qualified with catalog name +#' @return A named list. +#' @rdname getTable +#' @name getTable +#' @examples +#' \dontrun{ +#' sparkR.session() +#' tbl <- getTable("spark_catalog.default.myTable") +#' } +#' @note since 3.4.0 +getTable <- function(tableName) { + sparkSession <- getSparkSession() + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jtbl <- handledCallJMethod(catalog, "getTable", tableName) + + ret <- list(name = callJMethod(jtbl, "name")) + jcata <- callJMethod(jtbl, "catalog") + if (is.null(jcata)) { + ret$catalog <- NA + } else { + ret$catalog <- jcata + } + + jns <- callJMethod(jtbl, "namespace") + if (is.null(jns)) { + ret$namespace <- NA + } else { + ret$namespace <- jns + } + + jdesc <- callJMethod(jtbl, "description") + if (is.null(jdesc)) { + ret$description <- NA + } else { + ret$description <- jdesc + } + + ret$tableType <- callJMethod(jtbl, "tableType") + ret$isTemporary <- callJMethod(jtbl, "isTemporary") + ret +} + #' Returns a list of columns for the given table/view in the specified database #' #' Returns a list of columns for the given table/view in the specified database. @@ -410,6 +485,8 @@ listTables <- function(databaseName = NULL) { #' @param tableName the qualified or unqualified name that designates a table/view. If no database #' identifier is provided, it refers to a table/view in the current database. #' If \code{databaseName} parameter is specified, this must be an unqualified name. +#' The table name can be qualified with catalog name since 3.4.0, when databaseName +#' is NULL. #' @param databaseName (optional) name of the database #' @return a SparkDataFrame of the list of column descriptions. #' @rdname listColumns @@ -417,7 +494,7 @@ listTables <- function(databaseName = NULL) { #' @examples #' \dontrun{ #' sparkR.session() -#' listColumns("mytable") +#' listColumns("spark_catalog.default.mytable") #' } #' @note since 2.2.0 listColumns <- function(tableName, databaseName = NULL) { @@ -470,12 +547,13 @@ listFunctions <- function(databaseName = NULL) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @rdname recoverPartitions #' @name recoverPartitions #' @examples #' \dontrun{ #' sparkR.session() -#' recoverPartitions("myTable") +#' recoverPartitions("spark_catalog.default.myTable") #' } #' @note since 2.2.0 recoverPartitions <- function(tableName) { @@ -496,12 +574,13 @@ recoverPartitions <- function(tableName) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @rdname refreshTable #' @name refreshTable #' @examples #' \dontrun{ #' sparkR.session() -#' refreshTable("myTable") +#' refreshTable("spark_catalog.default.myTable") #' } #' @note since 2.2.0 refreshTable <- function(tableName) { diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml index d487b51ec5d..a9107c1293e 100644 --- a/R/pkg/pkgdown/_pkgdown_template.yml +++ b/R/pkg/pkgdown/_pkgdown_template.yml @@ -265,6 +265,7 @@ reference: - currentDatabase - dropTempTable - dropTempView + - getTable - listCatalogs - listColumns - listDatabases @@ -275,6 +276,7 @@ reference: - recoverPartitions - setCurrentCatalog - setCurrentDatabase + - tableExists - tableNames - tables - uncacheTable diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 9586d8b45a5..29a6c2580e4 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -696,16 +696,27 @@ test_that( expect_true(dropTempView("dfView")) }) -test_that("test cache, uncache and clearCache", { - df <- read.json(jsonPath) - createOrReplaceTempView(df, "table1") - cacheTable("table1") - uncacheTable("table1") +test_that("test tableExists, cache, uncache and clearCache", { + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("table1", source = "json", schema = schema) + + cacheTable("default.table1") + uncacheTable("spark_catalog.default.table1") clearCache() - expect_true(dropTempView("table1")) expect_error(uncacheTable("zxwtyswklpf"), "Error in uncacheTable : analysis error - Table or view not found: zxwtyswklpf") + + expect_true(tableExists("table1")) + expect_true(tableExists("default.table1")) + expect_true(tableExists("spark_catalog.default.table1")) + + sql("DROP TABLE IF EXISTS spark_catalog.default.table1") + + expect_false(tableExists("table1")) + expect_false(tableExists("default.table1")) + expect_false(tableExists("spark_catalog.default.table1")) }) test_that("insertInto() on a registered table", { @@ -1342,7 +1353,7 @@ test_that("test HiveContext", { schema <- structType(structField("name", "string"), structField("age", "integer"), structField("height", "float")) - createTable("people", source = "json", schema = schema) + createTable("spark_catalog.default.people", source = "json", schema = schema) df <- read.df(jsonPathNa, "json", schema) insertInto(df, "people") expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16)) @@ -4033,7 +4044,7 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { expect_equal(which(dbs[, 1] == "default"), 1) }) -test_that("catalog APIs, listTables, listColumns, listFunctions", { +test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", { tb <- listTables() count <- count(tables()) expect_equal(nrow(tb), count) @@ -4075,7 +4086,26 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { expect_error(refreshTable("cars"), NA) expect_error(refreshByPath("/"), NA) + view <- getTable("cars") + expect_equal(view$name, "cars") + expect_equal(view$tableType, "TEMPORARY") + expect_true(view$isTemporary) + dropTempView("cars") + + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("default.people", source = "json", schema = schema) + + tbl <- getTable("spark_catalog.default.people") + expect_equal(tbl$name, "people") + expect_equal(tbl$catalog, "spark_catalog") + expect_equal(length(tbl$namespace), 1) + expect_equal(tbl$namespace[[1]], "default") + expect_equal(tbl$tableType, "MANAGED") + expect_false(tbl$isTemporary) + + sql("DROP TABLE IF EXISTS people") }) test_that("assert_true, raise_error", { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org