[ https://issues.apache.org/jira/browse/ARROW-12688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381539#comment-17381539 ]
Neal Richardson edited comment on ARROW-12688 at 7/15/21, 6:39 PM: ------------------------------------------------------------------- Building from the code at https://github.com/pdet/duckdb-benchmark/blob/master/arrow/group_by_with_duckdb.R, I've worked up a slightly different interface, something we could add to the arrow package (adding duckdb and DBI to Suggests): {code} summarise.arrow_dplyr_query <- function(.data, ..., .engine = c("arrow", "duckdb")) { if (match.arg(.engine) == "duckdb") { summarize_duck(.data, ...) } else { # Continue with the current contents of summarise.arrow_dplyr_query # ... } } summarise_duck <- function(.data, ...) { # TODO better translation of aggregate functions, parse tree traversal aggregates <- vapply(enquos(...), rlang::quo_name, "character") tbl_name <- paste0(replicate(10, sample(LETTERS, 1, TRUE)), collapse = "") con <- arrow_duck_connection() duckdb::duckdb_register_arrow(con, tbl_name, .data$data) on.exit(duckdb::duckdb_unregister_arrow(con, tbl_name)) groups_str <- paste(.data$groups, collapse = ", ") aggr_str <- paste(aggregates, collapse = ", ") # TODO use relational API instead of SQL string construction DBI::dbGetQuery(con, sprintf("SELECT %s, %s FROM %s GROUP BY %s", groups_str, aggr_str, tbl_name, groups_str )) } arrow_duck_connection <- function() { con <- getOption("arrow_duck_con") if (is.null(con)) { con <- dbConnect(duckdb::duckdb()) # Use the same CPU count that the arrow library is set to DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count())) options(arrow_duck_con = con) } con } {code} Thoughts? was (Author: npr): Building from the code at https://github.com/pdet/duckdb-benchmark/blob/master/arrow/group_by_with_duckdb.R, I've worked up a slightly different interface, something we could add to the arrow package (adding duckdb and DBI to Suggests): {code} summarise.arrow_dplyr_query <- function(.data, ..., engine = c("arrow", "duckdb")) { if (match.arg(engine) == "duckdb") { summarize_duck(.data, ...) } else { # Continue with the current contents of summarise.arrow_dplyr_query # ... } } summarise_duck <- function(.data, ...) { # TODO better translation of aggregate functions, parse tree traversal aggregates <- vapply(enquos(...), rlang::quo_name, "character") tbl_name <- paste0(replicate(10, sample(LETTERS, 1, TRUE)), collapse = "") con <- arrow_duck_connection() duckdb::duckdb_register_arrow(con, tbl_name, .data$data) on.exit(duckdb::duckdb_unregister_arrow(con, tbl_name)) groups_str <- paste(.data$groups, collapse = ", ") aggr_str <- paste(aggregates, collapse = ", ") # TODO use relational API instead of SQL string construction DBI::dbGetQuery(con, sprintf("SELECT %s, %s FROM %s GROUP BY %s", groups_str, aggr_str, tbl_name, groups_str )) } arrow_duck_connection <- function() { con <- getOption("arrow_duck_con") if (is.null(con)) { con <- dbConnect(duckdb::duckdb()) # Use the same CPU count that the arrow library is set to DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count())) options(arrow_duck_con = con) } con } {code} Thoughts? > [R] Use DuckDB to query an Arrow Dataset > ---------------------------------------- > > Key: ARROW-12688 > URL: https://issues.apache.org/jira/browse/ARROW-12688 > Project: Apache Arrow > Issue Type: New Feature > Components: C++, R > Reporter: Neal Richardson > Assignee: Jonathan Keane > Priority: Major > > DuckDB can read data from an Arrow C-interface stream. Once we can provide > that struct from R, presumably DuckDB could query on that stream. > A first step is just connecting the pieces. A second step would be to handle > parts of the DuckDB query and push down filtering/projection to Arrow. > We need a function something like this: > {code} > #' Run a DuckDB query on Arrow data > #' > #' @param .data An `arrow` data object: `Dataset`, `Table`, `RecordBatch`, or > #' an `arrow_dplyr_query` containing filter/mutate/etc. expressions > #' @return A `duckdb::duckdb_connection` > to_duckdb <- function(.data) { > # ARROW-12687: [C++][Python][Dataset] Convert Scanner into a > RecordBatchReader > reader <- Scanner$create(.data)$ToRecordBatchReader() > # ARROW-12689: [R] Implement ArrowArrayStream C interface > stream_ptr <- allocate_arrow_array_stream() > on.exit(delete_arrow_array_stream(stream_ptr)) > ExportRecordBatchReader(x, stream_ptr) > # TODO: DuckDB method to create table/connection from ArrowArrayStream ptr > duckdb::duck_connection_from_arrow_stream(stream_ptr) > } > {code} > Assuming this existed, we could do something like (a variation of > https://arrow.apache.org/docs/r/articles/dataset.html): > {code} > ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) > ds %>% > filter(total_amount > 100, year == 2015) %>% > select(tip_amount, total_amount, passenger_count) %>% > mutate(tip_pct = 100 * tip_amount / total_amount) %>% > to_duckdb() %>% > group_by(passenger_count) %>% > summarise( > median_tip_pct = median(tip_pct), > n = n() > ) > {code} > and duckdb would do the aggregation while the data reading, predicate > pushdown, filtering, and projection would happen in Arrow. Or you could do > {{dbGetQuery(ds, "SOME SQL")}} and that would evaluate on arrow data. -- This message was sent by Atlassian Jira (v8.3.4#803005)