This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7fecddf refactor(r): Use `basic_array_stream()` to improve array
stream to data.frame conversion (#279)
7fecddf is described below
commit 7fecddf5d0b518468d320a93d36a24ca328752a7
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Aug 22 15:53:55 2023 -0300
refactor(r): Use `basic_array_stream()` to improve array stream to
data.frame conversion (#279)
When collecting an array stream with unknown size into a data.frame,
nanoarrow has pretty terrible performance. This is because it collects
and converts all batches and does `c()` or `rbind()` on the result. This
is particularly bad when collecting many tiny batches (e.g., like those
returned by many ADBC drivers).
`convert_array_stream()` has long had a "preallocate + fill" mode when
`size` was explicitly set. Recently, the addition of
`basic_array_stream()` makes it possible to recreate an array stream
from a previously-collected result. Collectively, this means we can
collect the whole stream, compute the size, and then call
`convert_array_stream()` with a known size. This is only about twice as
fast as the old approach but significantly reduces memory consumption
and makes for fewer code paths that need testing.
It would be nice to support ALTREP in the future (at the very least for
strings); however, I don't envision having the bandwidth to implement
that before the next release.
@krlmlr Would you mind having a look at this to see if it seems
reasonable?
Before this PR:
``` r
library(nanoarrow)
data_frames <- replicate(
1000,
nanoarrow:::vec_gen(
data.frame(x = logical(), y = double(), z = character()),
n = 1000
),
simplify = FALSE
)
bench::mark(
convert_known_size = {
stream <- basic_array_stream(data_frames, validate = FALSE)
convert_array_stream(stream, size = 1000 * 1000)
},
convert_unknown_size = {
stream <- basic_array_stream(data_frames, validate = FALSE)
as.data.frame(stream)
},
convert_arrow_altrep = {
options(arrow.use_altrep = TRUE)
stream <- basic_array_stream(data_frames, validate = FALSE)
reader <- arrow::as_record_batch_reader(stream)
as.data.frame(as.data.frame(reader))
},
convert_arrow = {
options(arrow.use_altrep = FALSE)
stream <- basic_array_stream(data_frames, validate = FALSE)
reader <- arrow::as_record_batch_reader(stream)
as.data.frame(as.data.frame(reader))
},
min_iterations = 20
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 convert_known_size 196.9ms 234ms 3.85 23.1MB 4.23
#> 2 convert_unknown_size 375.8ms 479ms 2.12 429.3MB 13.2
#> 3 convert_arrow_altrep 67.4ms 164ms 4.78 20.4MB 6.70
#> 4 convert_arrow 107.8ms 240ms 2.96 22.9MB 3.56
```
After this PR:
``` r
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 convert_known_size 203.4ms 225ms 3.99 23.2MB 3.99
#> 2 convert_unknown_size 266.5ms 396ms 0.895 23.1MB 2.60
#> 3 convert_arrow_altrep 68.5ms 214ms 3.76 20.4MB 3.76
#> 4 convert_arrow 130.6ms 227ms 2.93 22.9MB 3.23
```
<sup>Created on 2023-08-17 with [reprex
v2.0.2](https://reprex.tidyverse.org)</sup>
---
r/NAMESPACE | 1 +
r/R/convert-array-stream.R | 104 +++++++++++++++++++++++++++++++-----------
r/man/as_nanoarrow_schema.Rd | 4 +-
r/man/convert_array_stream.Rd | 17 ++++++-
r/src/array.c | 14 ++----
r/src/array_stream.c | 12 +++++
r/src/init.c | 3 ++
r/src/util.h | 11 +++++
r/tools/make-callentries.R | 2 +
9 files changed, 127 insertions(+), 41 deletions(-)
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 505aa33..51a4658 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -96,6 +96,7 @@ export(as_nanoarrow_array_stream)
export(as_nanoarrow_buffer)
export(as_nanoarrow_schema)
export(basic_array_stream)
+export(collect_array_stream)
export(convert_array)
export(convert_array_stream)
export(convert_buffer)
diff --git a/r/R/convert-array-stream.R b/r/R/convert-array-stream.R
index c91045b..b1f3e85 100644
--- a/r/R/convert-array-stream.R
+++ b/r/R/convert-array-stream.R
@@ -28,8 +28,11 @@
#' slightly more efficient implementation may be used to collect the output.
#' @param n The maximum number of batches to pull from the array stream.
#' @inheritParams convert_array
+#' @inheritParams basic_array_stream
#'
-#' @return An R vector of type `to`.
+#' @return
+#' - `convert_array_stream()`: An R vector of type `to`.
+#' - `collect_array_stream()`: A `list()` of
[nanoarrow_array][as_nanoarrow_array]
#' @export
#'
#' @examples
@@ -37,6 +40,9 @@
#' str(convert_array_stream(stream))
#' str(convert_array_stream(stream, to = data.frame(x = double())))
#'
+#' stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
+#' collect_array_stream(stream)
+#'
convert_array_stream <- function(array_stream, to = NULL, size = NULL, n =
Inf) {
stopifnot(
inherits(array_stream, "nanoarrow_array_stream")
@@ -50,38 +56,82 @@ convert_array_stream <- function(array_stream, to = NULL,
size = NULL, n = Inf)
}
n <- as.double(n)[1]
+
+
if (!is.null(size)) {
- return(
- .Call(
- nanoarrow_c_convert_array_stream,
- array_stream,
- to,
- as.double(size)[1],
- n
- )
+ # The underlying nanoarrow_c_convert_array_stream() currently requires that
+ # the total length of all batches is known in advance. If the caller
+ # provided this we can save a bit of work.
+ .Call(
+ nanoarrow_c_convert_array_stream,
+ array_stream,
+ to,
+ as.double(size)[1],
+ n
+ )
+ } else {
+ # Otherwise, we need to collect all batches and calculate the total length
+ # before calling nanoarrow_c_convert_array_stream().
+ batches <- collect_array_stream(
+ array_stream,
+ n,
+ schema = schema,
+ validate = FALSE
+ )
+
+ # If there is exactly one batch, use convert_array(). Converting a single
+ # array currently takes a more efficient code path for types that can be
+ # converted as ALTREP (e.g., strings).
+ if (length(batches) == 1L) {
+ return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
+ }
+
+ # Otherwise, compute the final size, create another array stream,
+ # and call convert_array_stream() with a known size. Using .Call()
+ # directly because we have already type checked the inputs.
+ size <- .Call(nanoarrow_c_array_list_total_length, batches)
+ basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema,
FALSE)
+
+ .Call(
+ nanoarrow_c_convert_array_stream,
+ basic_stream,
+ to,
+ as.double(size),
+ Inf
)
}
+}
- batches <- vector("list", 1024L)
- n_batches <- 0L
- get_next <- array_stream$get_next
- while (!is.null(array <- get_next(schema, validate = FALSE)) && (n_batches <
n)) {
- n_batches <- n_batches + 1L
- batches[[n_batches]] <- .Call(nanoarrow_c_convert_array, array, to)
+#' @rdname convert_array_stream
+#' @export
+collect_array_stream <- function(array_stream, n = Inf, schema = NULL,
+ validate = TRUE) {
+ stopifnot(
+ inherits(array_stream, "nanoarrow_array_stream")
+ )
+
+ if (is.null(schema)) {
+ schema <- .Call(nanoarrow_c_array_stream_get_schema, array_stream)
}
- if (n_batches == 0L) {
- # vec_slice(to, 0)
- if (is.data.frame(to)) {
- to[integer(0), , drop = FALSE]
- } else {
- to[integer(0)]
+ batches <- vector("list", 1024L)
+ n_batches <- 0
+ get_next <- array_stream$get_next
+ while (n_batches < n) {
+ array <- get_next(schema, validate = validate)
+ if (is.null(array)) {
+ break
}
- } else if (n_batches == 1L) {
- batches[[1]]
- } else if (inherits(to, "data.frame")) {
- do.call(rbind, batches[seq_len(n_batches)])
- } else {
- do.call(c, batches[seq_len(n_batches)])
+
+ n_batches <- n_batches + 1
+
+ # This assignment has reasonable (but not great) performance when
+ # n_batches > 1024 in recent versions of R because R overallocates vectors
+ # slightly to support this pattern. It may be worth moving this
+ # implementation to C or C++ in the future if the collect step becomes a
+ # bottleneck.
+ batches[[n_batches]] <- array
}
+
+ batches[seq_len(n_batches)]
}
diff --git a/r/man/as_nanoarrow_schema.Rd b/r/man/as_nanoarrow_schema.Rd
index 20f97f6..db32ff3 100644
--- a/r/man/as_nanoarrow_schema.Rd
+++ b/r/man/as_nanoarrow_schema.Rd
@@ -33,8 +33,8 @@ An object of class 'nanoarrow_schema'
\description{
In nanoarrow a 'schema' refers to a \verb{struct ArrowSchema} as defined in the
Arrow C Data interface. This data structure can be used to represent an
-\code{\link[arrow:Schema]{arrow::schema()}}, an
\code{\link[arrow:Field]{arrow::field()}}, or an \code{arrow::DataType}. Note
that
-in nanoarrow, an \code{\link[arrow:Schema]{arrow::schema()}} and a
non-nullable \code{\link[arrow:data-type]{arrow::struct()}}
+\code{\link[arrow:schema]{arrow::schema()}}, an
\code{\link[arrow:Field]{arrow::field()}}, or an \code{arrow::DataType}. Note
that
+in nanoarrow, an \code{\link[arrow:schema]{arrow::schema()}} and a
non-nullable \code{\link[arrow:data-type]{arrow::struct()}}
are represented identically.
}
\examples{
diff --git a/r/man/convert_array_stream.Rd b/r/man/convert_array_stream.Rd
index 97e3608..3146ecb 100644
--- a/r/man/convert_array_stream.Rd
+++ b/r/man/convert_array_stream.Rd
@@ -2,9 +2,12 @@
% Please edit documentation in R/convert-array-stream.R
\name{convert_array_stream}
\alias{convert_array_stream}
+\alias{collect_array_stream}
\title{Convert an Array Stream into an R vector}
\usage{
convert_array_stream(array_stream, to = NULL, size = NULL, n = Inf)
+
+collect_array_stream(array_stream, n = Inf, schema = NULL, validate = TRUE)
}
\arguments{
\item{array_stream}{A
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}.}
@@ -19,9 +22,18 @@ a function of \code{array} and the default inference of the
prototype.}
slightly more efficient implementation may be used to collect the output.}
\item{n}{The maximum number of batches to pull from the array stream.}
+
+\item{schema}{A \link[=as_nanoarrow_schema]{nanoarrow_schema} or \code{NULL}
to guess
+based on the first schema.}
+
+\item{validate}{Use \code{FALSE} to skip the validation step (i.e., if you
+know that the arrays are valid).}
}
\value{
-An R vector of type \code{to}.
+\itemize{
+\item \code{convert_array_stream()}: An R vector of type \code{to}.
+\item \code{collect_array_stream()}: A \code{list()} of
\link[=as_nanoarrow_array]{nanoarrow_array}
+}
}
\description{
Converts \code{array_stream} to the type specified by \code{to}. This is a
low-level
@@ -35,4 +47,7 @@ stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
str(convert_array_stream(stream))
str(convert_array_stream(stream, to = data.frame(x = double())))
+stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
+collect_array_stream(stream)
+
}
diff --git a/r/src/array.c b/r/src/array.c
index 2869f55..6a15f4a 100644
--- a/r/src/array.c
+++ b/r/src/array.c
@@ -375,14 +375,6 @@ static SEXP borrow_unknown_buffer(struct ArrowArray*
array, int64_t i, SEXP shel
return buffer_borrowed_xptr(array->buffers[i], 0, shelter);
}
-static SEXP length_from_int64(int64_t value) {
- if (value < INT_MAX) {
- return Rf_ScalarInteger(value);
- } else {
- return Rf_ScalarReal(value);
- }
-}
-
static SEXP borrow_buffer(struct ArrowArrayView* array_view, int64_t i, SEXP
shelter) {
SEXP buffer_class = PROTECT(Rf_allocVector(STRSXP, 2));
SET_STRING_ELT(buffer_class, 1, Rf_mkChar("nanoarrow_buffer"));
@@ -410,9 +402,9 @@ SEXP nanoarrow_c_array_proxy(SEXP array_xptr, SEXP
array_view_xptr, SEXP recursi
"children", "dictionary", ""};
SEXP array_proxy = PROTECT(Rf_mkNamed(VECSXP, names));
- SET_VECTOR_ELT(array_proxy, 0, length_from_int64(array->length));
- SET_VECTOR_ELT(array_proxy, 1, length_from_int64(array->null_count));
- SET_VECTOR_ELT(array_proxy, 2, length_from_int64(array->offset));
+ SET_VECTOR_ELT(array_proxy, 0, length_sexp_from_int64(array->length));
+ SET_VECTOR_ELT(array_proxy, 1, length_sexp_from_int64(array->null_count));
+ SET_VECTOR_ELT(array_proxy, 2, length_sexp_from_int64(array->offset));
if (array->n_buffers > 0) {
SEXP buffers = PROTECT(Rf_allocVector(VECSXP, array->n_buffers));
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 5e73f19..6711a0b 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -141,6 +141,18 @@ SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp,
SEXP schema_xptr,
return array_stream_xptr;
}
+SEXP nanoarrow_c_array_list_total_length(SEXP list_of_array_xptr) {
+ int64_t total_length = 0;
+ R_xlen_t num_chunks = Rf_xlength(list_of_array_xptr);
+ for (R_xlen_t i = 0; i < num_chunks; i++) {
+ struct ArrowArray* chunk =
+ (struct ArrowArray*)R_ExternalPtrAddr(VECTOR_ELT(list_of_array_xptr,
i));
+ total_length += chunk->length;
+ }
+
+ return length_sexp_from_int64(total_length);
+}
+
// Implementation of an ArrowArrayStream that keeps a dependent object valid
struct WrapperArrayStreamData {
SEXP parent_array_stream_xptr;
diff --git a/r/src/init.c b/r/src/init.c
index 964ca19..95a4454 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -31,6 +31,7 @@ extern SEXP nanoarrow_c_array_stream_get_schema(SEXP
array_stream_xptr);
extern SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr);
extern SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr,
SEXP validate_sexp);
+extern SEXP nanoarrow_c_array_list_total_length(SEXP list_of_array_xptr);
extern SEXP nanoarrow_c_array_view(SEXP array_xptr, SEXP schema_xptr);
extern SEXP nanoarrow_c_array_init(SEXP schema_xptr);
extern SEXP nanoarrow_c_array_set_length(SEXP array_xptr, SEXP length_sexp);
@@ -102,6 +103,8 @@ static const R_CallMethodDef CallEntries[] = {
1},
{"nanoarrow_c_array_stream_get_next",
(DL_FUNC)&nanoarrow_c_array_stream_get_next, 1},
{"nanoarrow_c_basic_array_stream",
(DL_FUNC)&nanoarrow_c_basic_array_stream, 3},
+ {"nanoarrow_c_array_list_total_length",
(DL_FUNC)&nanoarrow_c_array_list_total_length,
+ 1},
{"nanoarrow_c_array_view", (DL_FUNC)&nanoarrow_c_array_view, 2},
{"nanoarrow_c_array_init", (DL_FUNC)&nanoarrow_c_array_init, 1},
{"nanoarrow_c_array_set_length", (DL_FUNC)&nanoarrow_c_array_set_length,
2},
diff --git a/r/src/util.h b/r/src/util.h
index 338cd56..28b1bde 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -55,4 +55,15 @@ static inline void check_trivial_alloc(const void* ptr,
const char* ptr_type) {
}
}
+// So that lengths >INT_MAX do not overflow an INTSXP. Most places
+// in R return an integer length except for lengths where this is not
+// possible.
+static inline SEXP length_sexp_from_int64(int64_t value) {
+ if (value < INT_MAX) {
+ return Rf_ScalarInteger(value);
+ } else {
+ return Rf_ScalarReal(value);
+ }
+}
+
#endif
diff --git a/r/tools/make-callentries.R b/r/tools/make-callentries.R
index ace5488..403169c 100644
--- a/r/tools/make-callentries.R
+++ b/r/tools/make-callentries.R
@@ -74,3 +74,5 @@ stopifnot(str_detect(init, pattern))
init %>%
str_replace(pattern, header) %>%
write_file("src/init.c")
+
+system("clang-format -i src/init.c")