[jira] [Commented] (ARROW-12688) [R] Use DuckDB to query an Arrow Dataset

2021-07-27 Thread Ian Cook (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-12688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17388385#comment-17388385
 ] 

Ian Cook commented on ARROW-12688:
--

See ARROW-13472 for reconsideration of the user interaction design for querying 
with the DuckDB engine.

> [R] Use DuckDB to query an Arrow Dataset
> 
>
> Key: ARROW-12688
> URL: https://issues.apache.org/jira/browse/ARROW-12688
> Project: Apache Arrow
>  Issue Type: New Feature
>  Components: C++, R
>Reporter: Neal Richardson
>Assignee: Jonathan Keane
>Priority: Major
>  Labels: pull-request-available
> Fix For: 6.0.0
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> DuckDB can read data from an Arrow C-interface stream. Once we can provide 
> that struct from R, presumably DuckDB could query on that stream. 
> A first step is just connecting the pieces. A second step would be to handle 
> parts of the DuckDB query and push down filtering/projection to Arrow. 
> We need a function something like this:
> {code}
> #' Run a DuckDB query on Arrow data
> #'
> #' @param .data An `arrow` data object: `Dataset`, `Table`, `RecordBatch`, or 
> #' an `arrow_dplyr_query` containing filter/mutate/etc. expressions
> #' @return A `duckdb::duckdb_connection`
> to_duckdb <- function(.data) {
>   # ARROW-12687: [C++][Python][Dataset] Convert Scanner into a 
> RecordBatchReader 
>   reader <- Scanner$create(.data)$ToRecordBatchReader()
>   # ARROW-12689: [R] Implement ArrowArrayStream C interface
>   stream_ptr <- allocate_arrow_array_stream()
>   on.exit(delete_arrow_array_stream(stream_ptr))
>   ExportRecordBatchReader(x, stream_ptr)
>   # TODO: DuckDB method to create table/connection from ArrowArrayStream ptr
>   duckdb::duck_connection_from_arrow_stream(stream_ptr)
> }
> {code}
> Assuming this existed, we could do something like (a variation of 
> https://arrow.apache.org/docs/r/articles/dataset.html):
> {code}
> ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
> ds %>%
>   filter(total_amount > 100, year == 2015) %>%
>   select(tip_amount, total_amount, passenger_count) %>%
>   mutate(tip_pct = 100 * tip_amount / total_amount) %>%
>   to_duckdb() %>%
>   group_by(passenger_count) %>%
>   summarise(
> median_tip_pct = median(tip_pct),
> n = n()
>   )
> {code}
> and duckdb would do the aggregation while the data reading, predicate 
> pushdown, filtering, and projection would happen in Arrow. Or you could do 
> {{dbGetQuery(ds, "SOME SQL")}} and that would evaluate on arrow data. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-12688) [R] Use DuckDB to query an Arrow Dataset

2021-07-15 Thread Jonathan Keane (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-12688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381548#comment-17381548
 ] 

Jonathan Keane commented on ARROW-12688:


Or probably `compute()` would be better for this (c.f. ARROW-11754 and 
ARROW-12282)

> [R] Use DuckDB to query an Arrow Dataset
> 
>
> Key: ARROW-12688
> URL: https://issues.apache.org/jira/browse/ARROW-12688
> Project: Apache Arrow
>  Issue Type: New Feature
>  Components: C++, R
>Reporter: Neal Richardson
>Assignee: Jonathan Keane
>Priority: Major
>
> DuckDB can read data from an Arrow C-interface stream. Once we can provide 
> that struct from R, presumably DuckDB could query on that stream. 
> A first step is just connecting the pieces. A second step would be to handle 
> parts of the DuckDB query and push down filtering/projection to Arrow. 
> We need a function something like this:
> {code}
> #' Run a DuckDB query on Arrow data
> #'
> #' @param .data An `arrow` data object: `Dataset`, `Table`, `RecordBatch`, or 
> #' an `arrow_dplyr_query` containing filter/mutate/etc. expressions
> #' @return A `duckdb::duckdb_connection`
> to_duckdb <- function(.data) {
>   # ARROW-12687: [C++][Python][Dataset] Convert Scanner into a 
> RecordBatchReader 
>   reader <- Scanner$create(.data)$ToRecordBatchReader()
>   # ARROW-12689: [R] Implement ArrowArrayStream C interface
>   stream_ptr <- allocate_arrow_array_stream()
>   on.exit(delete_arrow_array_stream(stream_ptr))
>   ExportRecordBatchReader(x, stream_ptr)
>   # TODO: DuckDB method to create table/connection from ArrowArrayStream ptr
>   duckdb::duck_connection_from_arrow_stream(stream_ptr)
> }
> {code}
> Assuming this existed, we could do something like (a variation of 
> https://arrow.apache.org/docs/r/articles/dataset.html):
> {code}
> ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
> ds %>%
>   filter(total_amount > 100, year == 2015) %>%
>   select(tip_amount, total_amount, passenger_count) %>%
>   mutate(tip_pct = 100 * tip_amount / total_amount) %>%
>   to_duckdb() %>%
>   group_by(passenger_count) %>%
>   summarise(
> median_tip_pct = median(tip_pct),
> n = n()
>   )
> {code}
> and duckdb would do the aggregation while the data reading, predicate 
> pushdown, filtering, and projection would happen in Arrow. Or you could do 
> {{dbGetQuery(ds, "SOME SQL")}} and that would evaluate on arrow data. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-12688) [R] Use DuckDB to query an Arrow Dataset

2021-07-15 Thread Jonathan Keane (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-12688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381546#comment-17381546
 ] 

Jonathan Keane commented on ARROW-12688:


Hmm, I wonder if we (also?) want a `collect(..., .to = c("arrow", "duckdb")` 
that returns either a dataframe/arrowtable or a duckdb based `tbl` reference 
respectively such that pipelines like the following work:

{code}
ds %>%
  select(...) %>%
  filter(...) %>%
  mutate(...) %>% 
  collect(.to = "duckdb") %>% 
  group_by(...) %>% 
  summarise(...) %>% 
  collect()
{code}




> [R] Use DuckDB to query an Arrow Dataset
> 
>
> Key: ARROW-12688
> URL: https://issues.apache.org/jira/browse/ARROW-12688
> Project: Apache Arrow
>  Issue Type: New Feature
>  Components: C++, R
>Reporter: Neal Richardson
>Assignee: Jonathan Keane
>Priority: Major
>
> DuckDB can read data from an Arrow C-interface stream. Once we can provide 
> that struct from R, presumably DuckDB could query on that stream. 
> A first step is just connecting the pieces. A second step would be to handle 
> parts of the DuckDB query and push down filtering/projection to Arrow. 
> We need a function something like this:
> {code}
> #' Run a DuckDB query on Arrow data
> #'
> #' @param .data An `arrow` data object: `Dataset`, `Table`, `RecordBatch`, or 
> #' an `arrow_dplyr_query` containing filter/mutate/etc. expressions
> #' @return A `duckdb::duckdb_connection`
> to_duckdb <- function(.data) {
>   # ARROW-12687: [C++][Python][Dataset] Convert Scanner into a 
> RecordBatchReader 
>   reader <- Scanner$create(.data)$ToRecordBatchReader()
>   # ARROW-12689: [R] Implement ArrowArrayStream C interface
>   stream_ptr <- allocate_arrow_array_stream()
>   on.exit(delete_arrow_array_stream(stream_ptr))
>   ExportRecordBatchReader(x, stream_ptr)
>   # TODO: DuckDB method to create table/connection from ArrowArrayStream ptr
>   duckdb::duck_connection_from_arrow_stream(stream_ptr)
> }
> {code}
> Assuming this existed, we could do something like (a variation of 
> https://arrow.apache.org/docs/r/articles/dataset.html):
> {code}
> ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
> ds %>%
>   filter(total_amount > 100, year == 2015) %>%
>   select(tip_amount, total_amount, passenger_count) %>%
>   mutate(tip_pct = 100 * tip_amount / total_amount) %>%
>   to_duckdb() %>%
>   group_by(passenger_count) %>%
>   summarise(
> median_tip_pct = median(tip_pct),
> n = n()
>   )
> {code}
> and duckdb would do the aggregation while the data reading, predicate 
> pushdown, filtering, and projection would happen in Arrow. Or you could do 
> {{dbGetQuery(ds, "SOME SQL")}} and that would evaluate on arrow data. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-12688) [R] Use DuckDB to query an Arrow Dataset

2021-07-15 Thread Neal Richardson (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-12688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17381539#comment-17381539
 ] 

Neal Richardson commented on ARROW-12688:
-

Building from the code at 
https://github.com/pdet/duckdb-benchmark/blob/master/arrow/group_by_with_duckdb.R,
 I've worked up a slightly different interface, something we could add to the 
arrow package (adding duckdb and DBI to Suggests):

{code}

summarise.arrow_dplyr_query <- function(.data, ..., engine = c("arrow", 
"duckdb")) {
  if (match.arg(engine) == "duckdb") {
summarize_duck(.data, ...)
  } else {
# Continue with the current contents of summarise.arrow_dplyr_query
# ...
  }
}

summarise_duck <- function(.data, ...) {
  # TODO better translation of aggregate functions, parse tree traversal
  aggregates <- vapply(enquos(...), rlang::quo_name, "character")
  tbl_name <- paste0(replicate(10, sample(LETTERS, 1, TRUE)), collapse = "")

  con <- arrow_duck_connection()
  duckdb::duckdb_register_arrow(con, tbl_name, .data$data)
  on.exit(duckdb::duckdb_unregister_arrow(con, tbl_name))

  groups_str <- paste(.data$groups, collapse = ", ")
  aggr_str <- paste(aggregates, collapse = ", ")
  # TODO use relational API instead of SQL string construction
  DBI::dbGetQuery(con, sprintf("SELECT %s, %s FROM %s GROUP BY %s", 
groups_str, aggr_str, tbl_name, groups_str ))
}

arrow_duck_connection <- function() {
  con <- getOption("arrow_duck_con")
  if (is.null(con)) {
con <- dbConnect(duckdb::duckdb())
# Use the same CPU count that the arrow library is set to
DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count()))
options(arrow_duck_con = con)
  }
  con
}
{code}

Thoughts?

> [R] Use DuckDB to query an Arrow Dataset
> 
>
> Key: ARROW-12688
> URL: https://issues.apache.org/jira/browse/ARROW-12688
> Project: Apache Arrow
>  Issue Type: New Feature
>  Components: C++, R
>Reporter: Neal Richardson
>Assignee: Jonathan Keane
>Priority: Major
>
> DuckDB can read data from an Arrow C-interface stream. Once we can provide 
> that struct from R, presumably DuckDB could query on that stream. 
> A first step is just connecting the pieces. A second step would be to handle 
> parts of the DuckDB query and push down filtering/projection to Arrow. 
> We need a function something like this:
> {code}
> #' Run a DuckDB query on Arrow data
> #'
> #' @param .data An `arrow` data object: `Dataset`, `Table`, `RecordBatch`, or 
> #' an `arrow_dplyr_query` containing filter/mutate/etc. expressions
> #' @return A `duckdb::duckdb_connection`
> to_duckdb <- function(.data) {
>   # ARROW-12687: [C++][Python][Dataset] Convert Scanner into a 
> RecordBatchReader 
>   reader <- Scanner$create(.data)$ToRecordBatchReader()
>   # ARROW-12689: [R] Implement ArrowArrayStream C interface
>   stream_ptr <- allocate_arrow_array_stream()
>   on.exit(delete_arrow_array_stream(stream_ptr))
>   ExportRecordBatchReader(x, stream_ptr)
>   # TODO: DuckDB method to create table/connection from ArrowArrayStream ptr
>   duckdb::duck_connection_from_arrow_stream(stream_ptr)
> }
> {code}
> Assuming this existed, we could do something like (a variation of 
> https://arrow.apache.org/docs/r/articles/dataset.html):
> {code}
> ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
> ds %>%
>   filter(total_amount > 100, year == 2015) %>%
>   select(tip_amount, total_amount, passenger_count) %>%
>   mutate(tip_pct = 100 * tip_amount / total_amount) %>%
>   to_duckdb() %>%
>   group_by(passenger_count) %>%
>   summarise(
> median_tip_pct = median(tip_pct),
> n = n()
>   )
> {code}
> and duckdb would do the aggregation while the data reading, predicate 
> pushdown, filtering, and projection would happen in Arrow. Or you could do 
> {{dbGetQuery(ds, "SOME SQL")}} and that would evaluate on arrow data. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)