This is an automated email from the ASF dual-hosted git repository.
npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7448322ebe ARROW-17252: [R] Intermittent valgrind failure (#13773)
7448322ebe is described below
commit 7448322ebe34c6efae413a52338ebf7efa1a6069
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Aug 9 07:57:57 2022 -0300
ARROW-17252: [R] Intermittent valgrind failure (#13773)
This PR fixes intermittent leaks that occur after one of the changes from
ARROW-16444: when we drain the `RecordBatchReader` that is emitted from the
plan too quickly, it seems, some parts of the plan can leak (I don't know why
this happens).
I tried removing various pieces of the `RunWithCapturedR()` changes (see
#13746) but the only thing that removes the errors completely is draining the
resulting `RecordBatchReader` from R (i.e., `reader$read_table()`) instead of
in C++ (i.e., `reader->ToTable()`). Unfortunately, for user-defined functions
to work in a plan we need a C++ level `reader->ToTable()`. I took the approach
here of disabling the C++ level read by default, requiring a user to opt in to
the version of `collect( [...]
I was able to replicate the original leaks but they are few and far
between...our tests just happen to create and destroy many, many exec plans and
something about the CI environment seems to trigger these more reliably
(although the errors don't always occur there, either). Most of the leaks are
small but there were some instances where an entire `Table` leaked.
Authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
---
r/R/compute.R | 9 ++++++-
r/R/table.R | 9 ++++++-
r/man/register_scalar_function.Rd | 2 +-
r/tests/testthat/test-compute.R | 51 ++++++++++++++++++++++++++++++---------
4 files changed, 57 insertions(+), 14 deletions(-)
diff --git a/r/R/compute.R b/r/R/compute.R
index 0985e73a5f..636c9146ca 100644
--- a/r/R/compute.R
+++ b/r/R/compute.R
@@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) {
#' @return `NULL`, invisibly
#' @export
#'
-#' @examplesIf arrow_with_dataset()
+#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("NOT_CRAN"),
"true")
#' library(dplyr, warn.conflicts = FALSE)
#'
#' some_model <- lm(mpg ~ disp + cyl, data = mtcars)
@@ -385,6 +385,13 @@ register_scalar_function <- function(name, fun, in_type,
out_type,
update_cache = TRUE
)
+ # User-defined functions require some special handling
+ # in the query engine which currently require an opt-in using
+ # the R_ARROW_COLLECT_WITH_UDF environment variable while this
+ # behaviour is stabilized.
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true")
+
invisible(NULL)
}
diff --git a/r/R/table.R b/r/R/table.R
index 5579c676d5..d7e276415c 100644
--- a/r/R/table.R
+++ b/r/R/table.R
@@ -331,5 +331,12 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)
- plan$Run(final_node, as_table = TRUE)
+
+ run_with_event_loop <- identical(
+ Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""),
+ "true"
+ )
+
+ result <- plan$Run(final_node, as_table = run_with_event_loop)
+ as_arrow_table(result)
}
diff --git a/r/man/register_scalar_function.Rd
b/r/man/register_scalar_function.Rd
index 4da8f54f64..324dd5fad1 100644
--- a/r/man/register_scalar_function.Rd
+++ b/r/man/register_scalar_function.Rd
@@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the
same number
of rows) as the input.
}
\examples{
-\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint
else force)(\{ # examplesIf}
+\dontshow{if (arrow_with_dataset() && identical(Sys.getenv("NOT_CRAN"),
"true")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
library(dplyr, warn.conflicts = FALSE)
some_model <- lm(mpg ~ disp + cyl, data = mtcars)
diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R
index 9e487169f4..5821c0fa2d 100644
--- a/r/tests/testthat/test-compute.R
+++ b/r/tests/testthat/test-compute.R
@@ -81,6 +81,9 @@ test_that("arrow_scalar_function() works with auto_convert =
TRUE", {
test_that("register_scalar_function() adds a compute function to the
registry", {
skip_if_not(CanRunWithCapturedR())
+ # TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has
+ # occasional valgrind errors
+ skip_on_linux_devel()
register_scalar_function(
"times_32",
@@ -88,7 +91,11 @@ test_that("register_scalar_function() adds a compute
function to the registry",
int32(), float64(),
auto_convert = TRUE
)
- on.exit(unregister_binding("times_32", update_cache = TRUE))
+ on.exit({
+ unregister_binding("times_32", update_cache = TRUE)
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
expect_true("times_32" %in% names(asNamespace("arrow")$.cache$functions))
expect_true("times_32" %in% list_compute_functions())
@@ -120,9 +127,11 @@ test_that("arrow_scalar_function() with bad return type
errors", {
int32(),
float64()
)
- on.exit(
+ on.exit({
unregister_binding("times_32_bad_return_type_array", update_cache = TRUE)
- )
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
expect_error(
call_function("times_32_bad_return_type_array", Array$create(1L)),
@@ -135,9 +144,11 @@ test_that("arrow_scalar_function() with bad return type
errors", {
int32(),
float64()
)
- on.exit(
+ on.exit({
unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE)
- )
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
expect_error(
call_function("times_32_bad_return_type_scalar", Array$create(1L)),
@@ -145,7 +156,7 @@ test_that("arrow_scalar_function() with bad return type
errors", {
)
})
-test_that("register_user_defined_function() can register multiple kernels", {
+test_that("register_scalar_function() can register multiple kernels", {
skip_if_not(CanRunWithCapturedR())
register_scalar_function(
@@ -155,7 +166,11 @@ test_that("register_user_defined_function() can register
multiple kernels", {
out_type = function(in_types) in_types[[1]],
auto_convert = TRUE
)
- on.exit(unregister_binding("times_32", update_cache = TRUE))
+ on.exit({
+ unregister_binding("times_32", update_cache = TRUE)
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
expect_equal(
call_function("times_32", Scalar$create(1L, int32())),
@@ -173,7 +188,10 @@ test_that("register_user_defined_function() can register
multiple kernels", {
)
})
-test_that("register_user_defined_function() errors for unsupported
specifications", {
+test_that("register_scalar_function() errors for unsupported specifications", {
+ # TODO(ARROW-17178) remove the need for this!
+ on.exit(Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF"))
+
expect_error(
register_scalar_function(
"no_kernels",
@@ -208,7 +226,10 @@ test_that("register_user_defined_function() errors for
unsupported specification
test_that("user-defined functions work during multi-threaded execution", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("dataset")
- # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148
+ # Skip on linux devel because:
+ # TODO(ARROW-17283): Snappy has a UBSan issue that is fixed in the dev
version
+ # TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has
+ # occasional valgrind errors
skip_on_linux_devel()
n_rows <- 10000
@@ -235,7 +256,11 @@ test_that("user-defined functions work during
multi-threaded execution", {
float64(),
auto_convert = TRUE
)
- on.exit(unregister_binding("times_32", update_cache = TRUE))
+ on.exit({
+ unregister_binding("times_32", update_cache = TRUE)
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
# check a regular collect()
result <- open_dataset(tf_dataset) %>%
@@ -268,7 +293,11 @@ test_that("user-defined error when called from an
unsupported context", {
float64(),
auto_convert = TRUE
)
- on.exit(unregister_binding("times_32", update_cache = TRUE))
+ on.exit({
+ unregister_binding("times_32", update_cache = TRUE)
+ # TODO(ARROW-17178) remove the need for this!
+ Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
+ })
stream_plan_with_udf <- function() {
record_batch(a = 1:1000) %>%