paleolimbot commented on pull request #12564: URL: https://github.com/apache/arrow/pull/12564#issuecomment-1061817573
Ok! This is ready for some eyes. The C++ that works with the ExecPlan is something that I got to work, but it is not elegant (notably, it requires that the output field names are known in advance). As @westonpace mentioned above, the C++ may be able to absorb the implementation for substrait -> RecordBatchReader, which is really what we want. I'm happy to put this PR on hold until there's more infrastructure to make this easier, although either way we will likely need several follow-up PRs. My preference would be to merge a preliminary version with an unexported `do_exec_plan_substrait()` so that Nic and I don't have to rebuild the R package every time we switch projects (but we can also make that work if that sounds too hacky). I think that in all cases we will need multiple follow up PRs as the capabilities of C++ substrait and the R substrait evolve. To make sure this PR is going to serve its purpose, I also implemented a preliminary substrait package function that calls `do_exec_plan_substrait()` (for reference below). <details> substrait_eval_arrow <- function(plan, tables) { plan <- as_substrait(plan, "substrait.Plan") stopifnot(rlang::is_named2(tables)) # only support plans with exactly one relation in the relations list for now stopifnot(length(plan$relations) == 1) temp_parquet <- vapply(tables, function(i) tempfile(), character(1)) on.exit(unlink(temp_parquet)) local_file_tables <- lapply(seq_along(tables), function(i) { substrait$ReadRel$LocalFiles$create( items = list( substrait$ReadRel$LocalFiles$FileOrFiles$create( uri_file = sprintf("file://%s", temp_parquet[i]), format = substrait$ReadRel$LocalFiles$FileOrFiles$FileFormat$ FILE_FORMAT_PARQUET ) ) ) }) names(local_file_tables) <- names(tables) table_base_schema <- lapply(tables, as_substrait, "substrait.NamedStruct") call_for_errors <- sys.call() # walk the relation tree looking for named tables, replacing # with those from local_file_tables plan <- rel_tree_modify(plan, "substrait_ReadRel", function(x) { if (isTRUE("named_table" %in% names(x))) { name <- x$named_table$names if (!isTRUE(name %in% names(local_file_tables))) { rlang::abort( sprintf("Named table '%s' not found in `tables`", name), call = call_for_errors ) } if (!identical(x$base_schema, table_base_schema[[name]])) { rlang::abort( sprintf( "Base schema for table '%s' does not match declared base schema", name ), call = call_for_errors ) } x$named_table <- NULL x$local_files <- local_file_tables[[name]] x } else { x } }) col_names <- substrait_colnames(plan$relations[[1]]) # write parquet files Map(arrow::write_parquet, tables, temp_parquet) # run the exec plan getNamespace("arrow")[["do_exec_plan_substrait"]](as.raw(plan), col_names) } </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org