This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fecddf  refactor(r): Use `basic_array_stream()` to improve array 
stream to data.frame conversion (#279)
7fecddf is described below

commit 7fecddf5d0b518468d320a93d36a24ca328752a7
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Aug 22 15:53:55 2023 -0300

    refactor(r): Use `basic_array_stream()` to improve array stream to 
data.frame conversion (#279)
    
    When collecting an array stream with unknown size into a data.frame,
    nanoarrow has pretty terrible performance. This is because it collects
    and converts all batches and does `c()` or `rbind()` on the result. This
    is particularly bad when collecting many tiny batches (e.g., like those
    returned by many ADBC drivers).
    
    `convert_array_stream()` has long had a "preallocate + fill" mode when
    `size` was explicitly set. Recently, the addition of
    `basic_array_stream()` makes it possible to recreate an array stream
    from a previously-collected result. Collectively, this means we can
    collect the whole stream, compute the size, and then call
    `convert_array_stream()` with a known size. This is only about twice as
    fast as the old approach but significantly reduces memory consumption
    and makes for fewer code paths that need testing.
    
    It would be nice to support ALTREP in the future (at the very least for
    strings); however, I don't envision having the bandwidth to implement
    that before the next release.
    
    @krlmlr Would you mind having a look at this to see if it seems
    reasonable?
    
    Before this PR:
    
    ``` r
    library(nanoarrow)
    
    data_frames <- replicate(
      1000,
      nanoarrow:::vec_gen(
        data.frame(x = logical(), y = double(), z = character()),
        n = 1000
      ),
      simplify = FALSE
    )
    
    bench::mark(
      convert_known_size = {
        stream <- basic_array_stream(data_frames, validate = FALSE)
        convert_array_stream(stream, size = 1000 * 1000)
      },
      convert_unknown_size = {
        stream <- basic_array_stream(data_frames, validate = FALSE)
        as.data.frame(stream)
      },
      convert_arrow_altrep = {
        options(arrow.use_altrep = TRUE)
        stream <- basic_array_stream(data_frames, validate = FALSE)
        reader <- arrow::as_record_batch_reader(stream)
        as.data.frame(as.data.frame(reader))
      },
      convert_arrow = {
        options(arrow.use_altrep = FALSE)
        stream <- basic_array_stream(data_frames, validate = FALSE)
        reader <- arrow::as_record_batch_reader(stream)
        as.data.frame(as.data.frame(reader))
      },
      min_iterations = 20
    )
    #> Warning: Some expressions had a GC in every iteration; so filtering is
    #> disabled.
    #> # A tibble: 4 × 6
    #>   expression                min   median `itr/sec` mem_alloc `gc/sec`
    #>   <bch:expr>           <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
    #> 1 convert_known_size    196.9ms    234ms      3.85    23.1MB     4.23
    #> 2 convert_unknown_size  375.8ms    479ms      2.12   429.3MB    13.2
    #> 3 convert_arrow_altrep   67.4ms    164ms      4.78    20.4MB     6.70
    #> 4 convert_arrow         107.8ms    240ms      2.96    22.9MB     3.56
    ```
    
    After this PR:
    
    ``` r
    #> Warning: Some expressions had a GC in every iteration; so filtering is
    #> disabled.
    #> # A tibble: 4 × 6
    #>   expression                min   median `itr/sec` mem_alloc `gc/sec`
    #>   <bch:expr>           <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
    #> 1 convert_known_size    203.4ms    225ms     3.99     23.2MB     3.99
    #> 2 convert_unknown_size  266.5ms    396ms     0.895    23.1MB     2.60
    #> 3 convert_arrow_altrep   68.5ms    214ms     3.76     20.4MB     3.76
    #> 4 convert_arrow         130.6ms    227ms     2.93     22.9MB     3.23
    ```
    
    <sup>Created on 2023-08-17 with [reprex
    v2.0.2](https://reprex.tidyverse.org)</sup>
---
 r/NAMESPACE                   |   1 +
 r/R/convert-array-stream.R    | 104 +++++++++++++++++++++++++++++++-----------
 r/man/as_nanoarrow_schema.Rd  |   4 +-
 r/man/convert_array_stream.Rd |  17 ++++++-
 r/src/array.c                 |  14 ++----
 r/src/array_stream.c          |  12 +++++
 r/src/init.c                  |   3 ++
 r/src/util.h                  |  11 +++++
 r/tools/make-callentries.R    |   2 +
 9 files changed, 127 insertions(+), 41 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index 505aa33..51a4658 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -96,6 +96,7 @@ export(as_nanoarrow_array_stream)
 export(as_nanoarrow_buffer)
 export(as_nanoarrow_schema)
 export(basic_array_stream)
+export(collect_array_stream)
 export(convert_array)
 export(convert_array_stream)
 export(convert_buffer)
diff --git a/r/R/convert-array-stream.R b/r/R/convert-array-stream.R
index c91045b..b1f3e85 100644
--- a/r/R/convert-array-stream.R
+++ b/r/R/convert-array-stream.R
@@ -28,8 +28,11 @@
 #'   slightly more efficient implementation may be used to collect the output.
 #' @param n The maximum number of batches to pull from the array stream.
 #' @inheritParams convert_array
+#' @inheritParams basic_array_stream
 #'
-#' @return An R vector of type `to`.
+#' @return
+#'   - `convert_array_stream()`: An R vector of type `to`.
+#'   - `collect_array_stream()`: A `list()` of 
[nanoarrow_array][as_nanoarrow_array]
 #' @export
 #'
 #' @examples
@@ -37,6 +40,9 @@
 #' str(convert_array_stream(stream))
 #' str(convert_array_stream(stream, to = data.frame(x = double())))
 #'
+#' stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
+#' collect_array_stream(stream)
+#'
 convert_array_stream <- function(array_stream, to = NULL, size = NULL, n = 
Inf) {
   stopifnot(
     inherits(array_stream, "nanoarrow_array_stream")
@@ -50,38 +56,82 @@ convert_array_stream <- function(array_stream, to = NULL, 
size = NULL, n = Inf)
   }
 
   n <- as.double(n)[1]
+
+
   if (!is.null(size)) {
-    return(
-      .Call(
-        nanoarrow_c_convert_array_stream,
-        array_stream,
-        to,
-        as.double(size)[1],
-        n
-      )
+    # The underlying nanoarrow_c_convert_array_stream() currently requires that
+    # the total length of all batches is known in advance. If the caller
+    # provided this we can save a bit of work.
+    .Call(
+      nanoarrow_c_convert_array_stream,
+      array_stream,
+      to,
+      as.double(size)[1],
+      n
+    )
+  } else {
+    # Otherwise, we need to collect all batches and calculate the total length
+    # before calling nanoarrow_c_convert_array_stream().
+    batches <- collect_array_stream(
+      array_stream,
+      n,
+      schema = schema,
+      validate = FALSE
+    )
+
+    # If there is exactly one batch, use convert_array(). Converting a single
+    # array currently takes a more efficient code path for types that can be
+    # converted as ALTREP (e.g., strings).
+    if (length(batches) == 1L) {
+      return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
+    }
+
+    # Otherwise, compute the final size, create another array stream,
+    # and call convert_array_stream() with a known size. Using .Call()
+    # directly because we have already type checked the inputs.
+    size <- .Call(nanoarrow_c_array_list_total_length, batches)
+    basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, 
FALSE)
+
+    .Call(
+      nanoarrow_c_convert_array_stream,
+      basic_stream,
+      to,
+      as.double(size),
+      Inf
     )
   }
+}
 
-  batches <- vector("list", 1024L)
-  n_batches <- 0L
-  get_next <- array_stream$get_next
-  while (!is.null(array <- get_next(schema, validate = FALSE)) && (n_batches < 
n)) {
-    n_batches <- n_batches + 1L
-    batches[[n_batches]] <- .Call(nanoarrow_c_convert_array, array, to)
+#' @rdname convert_array_stream
+#' @export
+collect_array_stream <- function(array_stream, n = Inf, schema = NULL,
+                                 validate = TRUE) {
+  stopifnot(
+    inherits(array_stream, "nanoarrow_array_stream")
+  )
+
+  if (is.null(schema)) {
+    schema <- .Call(nanoarrow_c_array_stream_get_schema, array_stream)
   }
 
-  if (n_batches == 0L) {
-    # vec_slice(to, 0)
-    if (is.data.frame(to)) {
-      to[integer(0), , drop = FALSE]
-    } else {
-      to[integer(0)]
+  batches <- vector("list", 1024L)
+  n_batches <- 0
+  get_next <- array_stream$get_next
+  while (n_batches < n) {
+    array <- get_next(schema, validate = validate)
+    if (is.null(array)) {
+      break
     }
-  } else if (n_batches == 1L) {
-    batches[[1]]
-  } else if (inherits(to, "data.frame")) {
-    do.call(rbind, batches[seq_len(n_batches)])
-  } else {
-    do.call(c, batches[seq_len(n_batches)])
+
+    n_batches <- n_batches + 1
+
+    # This assignment has reasonable (but not great) performance when
+    # n_batches > 1024 in recent versions of R because R overallocates vectors
+    # slightly to support this pattern. It may be worth moving this
+    # implementation to C or C++ in the future if the collect step becomes a
+    # bottleneck.
+    batches[[n_batches]] <- array
   }
+
+  batches[seq_len(n_batches)]
 }
diff --git a/r/man/as_nanoarrow_schema.Rd b/r/man/as_nanoarrow_schema.Rd
index 20f97f6..db32ff3 100644
--- a/r/man/as_nanoarrow_schema.Rd
+++ b/r/man/as_nanoarrow_schema.Rd
@@ -33,8 +33,8 @@ An object of class 'nanoarrow_schema'
 \description{
 In nanoarrow a 'schema' refers to a \verb{struct ArrowSchema} as defined in the
 Arrow C Data interface. This data structure can be used to represent an
-\code{\link[arrow:Schema]{arrow::schema()}}, an 
\code{\link[arrow:Field]{arrow::field()}}, or an \code{arrow::DataType}. Note 
that
-in nanoarrow, an \code{\link[arrow:Schema]{arrow::schema()}} and a 
non-nullable \code{\link[arrow:data-type]{arrow::struct()}}
+\code{\link[arrow:schema]{arrow::schema()}}, an 
\code{\link[arrow:Field]{arrow::field()}}, or an \code{arrow::DataType}. Note 
that
+in nanoarrow, an \code{\link[arrow:schema]{arrow::schema()}} and a 
non-nullable \code{\link[arrow:data-type]{arrow::struct()}}
 are represented identically.
 }
 \examples{
diff --git a/r/man/convert_array_stream.Rd b/r/man/convert_array_stream.Rd
index 97e3608..3146ecb 100644
--- a/r/man/convert_array_stream.Rd
+++ b/r/man/convert_array_stream.Rd
@@ -2,9 +2,12 @@
 % Please edit documentation in R/convert-array-stream.R
 \name{convert_array_stream}
 \alias{convert_array_stream}
+\alias{collect_array_stream}
 \title{Convert an Array Stream into an R vector}
 \usage{
 convert_array_stream(array_stream, to = NULL, size = NULL, n = Inf)
+
+collect_array_stream(array_stream, n = Inf, schema = NULL, validate = TRUE)
 }
 \arguments{
 \item{array_stream}{A 
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}.}
@@ -19,9 +22,18 @@ a function of \code{array} and the default inference of the 
prototype.}
 slightly more efficient implementation may be used to collect the output.}
 
 \item{n}{The maximum number of batches to pull from the array stream.}
+
+\item{schema}{A \link[=as_nanoarrow_schema]{nanoarrow_schema} or \code{NULL} 
to guess
+based on the first schema.}
+
+\item{validate}{Use \code{FALSE} to skip the validation step (i.e., if you
+know that the arrays are valid).}
 }
 \value{
-An R vector of type \code{to}.
+\itemize{
+\item \code{convert_array_stream()}: An R vector of type \code{to}.
+\item \code{collect_array_stream()}: A \code{list()} of 
\link[=as_nanoarrow_array]{nanoarrow_array}
+}
 }
 \description{
 Converts \code{array_stream} to the type specified by \code{to}. This is a 
low-level
@@ -35,4 +47,7 @@ stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
 str(convert_array_stream(stream))
 str(convert_array_stream(stream, to = data.frame(x = double())))
 
+stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
+collect_array_stream(stream)
+
 }
diff --git a/r/src/array.c b/r/src/array.c
index 2869f55..6a15f4a 100644
--- a/r/src/array.c
+++ b/r/src/array.c
@@ -375,14 +375,6 @@ static SEXP borrow_unknown_buffer(struct ArrowArray* 
array, int64_t i, SEXP shel
   return buffer_borrowed_xptr(array->buffers[i], 0, shelter);
 }
 
-static SEXP length_from_int64(int64_t value) {
-  if (value < INT_MAX) {
-    return Rf_ScalarInteger(value);
-  } else {
-    return Rf_ScalarReal(value);
-  }
-}
-
 static SEXP borrow_buffer(struct ArrowArrayView* array_view, int64_t i, SEXP 
shelter) {
   SEXP buffer_class = PROTECT(Rf_allocVector(STRSXP, 2));
   SET_STRING_ELT(buffer_class, 1, Rf_mkChar("nanoarrow_buffer"));
@@ -410,9 +402,9 @@ SEXP nanoarrow_c_array_proxy(SEXP array_xptr, SEXP 
array_view_xptr, SEXP recursi
                          "children", "dictionary", ""};
   SEXP array_proxy = PROTECT(Rf_mkNamed(VECSXP, names));
 
-  SET_VECTOR_ELT(array_proxy, 0, length_from_int64(array->length));
-  SET_VECTOR_ELT(array_proxy, 1, length_from_int64(array->null_count));
-  SET_VECTOR_ELT(array_proxy, 2, length_from_int64(array->offset));
+  SET_VECTOR_ELT(array_proxy, 0, length_sexp_from_int64(array->length));
+  SET_VECTOR_ELT(array_proxy, 1, length_sexp_from_int64(array->null_count));
+  SET_VECTOR_ELT(array_proxy, 2, length_sexp_from_int64(array->offset));
 
   if (array->n_buffers > 0) {
     SEXP buffers = PROTECT(Rf_allocVector(VECSXP, array->n_buffers));
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 5e73f19..6711a0b 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -141,6 +141,18 @@ SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, 
SEXP schema_xptr,
   return array_stream_xptr;
 }
 
+SEXP nanoarrow_c_array_list_total_length(SEXP list_of_array_xptr) {
+  int64_t total_length = 0;
+  R_xlen_t num_chunks = Rf_xlength(list_of_array_xptr);
+  for (R_xlen_t i = 0; i < num_chunks; i++) {
+    struct ArrowArray* chunk =
+        (struct ArrowArray*)R_ExternalPtrAddr(VECTOR_ELT(list_of_array_xptr, 
i));
+    total_length += chunk->length;
+  }
+
+  return length_sexp_from_int64(total_length);
+}
+
 // Implementation of an ArrowArrayStream that keeps a dependent object valid
 struct WrapperArrayStreamData {
   SEXP parent_array_stream_xptr;
diff --git a/r/src/init.c b/r/src/init.c
index 964ca19..95a4454 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -31,6 +31,7 @@ extern SEXP nanoarrow_c_array_stream_get_schema(SEXP 
array_stream_xptr);
 extern SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr);
 extern SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr,
                                            SEXP validate_sexp);
+extern SEXP nanoarrow_c_array_list_total_length(SEXP list_of_array_xptr);
 extern SEXP nanoarrow_c_array_view(SEXP array_xptr, SEXP schema_xptr);
 extern SEXP nanoarrow_c_array_init(SEXP schema_xptr);
 extern SEXP nanoarrow_c_array_set_length(SEXP array_xptr, SEXP length_sexp);
@@ -102,6 +103,8 @@ static const R_CallMethodDef CallEntries[] = {
      1},
     {"nanoarrow_c_array_stream_get_next", 
(DL_FUNC)&nanoarrow_c_array_stream_get_next, 1},
     {"nanoarrow_c_basic_array_stream", 
(DL_FUNC)&nanoarrow_c_basic_array_stream, 3},
+    {"nanoarrow_c_array_list_total_length", 
(DL_FUNC)&nanoarrow_c_array_list_total_length,
+     1},
     {"nanoarrow_c_array_view", (DL_FUNC)&nanoarrow_c_array_view, 2},
     {"nanoarrow_c_array_init", (DL_FUNC)&nanoarrow_c_array_init, 1},
     {"nanoarrow_c_array_set_length", (DL_FUNC)&nanoarrow_c_array_set_length, 
2},
diff --git a/r/src/util.h b/r/src/util.h
index 338cd56..28b1bde 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -55,4 +55,15 @@ static inline void check_trivial_alloc(const void* ptr, 
const char* ptr_type) {
   }
 }
 
+// So that lengths >INT_MAX do not overflow an INTSXP. Most places
+// in R return an integer length except for lengths where this is not
+// possible.
+static inline SEXP length_sexp_from_int64(int64_t value) {
+  if (value < INT_MAX) {
+    return Rf_ScalarInteger(value);
+  } else {
+    return Rf_ScalarReal(value);
+  }
+}
+
 #endif
diff --git a/r/tools/make-callentries.R b/r/tools/make-callentries.R
index ace5488..403169c 100644
--- a/r/tools/make-callentries.R
+++ b/r/tools/make-callentries.R
@@ -74,3 +74,5 @@ stopifnot(str_detect(init, pattern))
 init %>%
   str_replace(pattern, header) %>%
   write_file("src/init.c")
+
+system("clang-format -i src/init.c")

Reply via email to