westonpace commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806235649
##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
if (inherits(dataset, "arrow_dplyr_query")) {
# partitioning vars need to be in the `select` schema
dataset <- ensure_group_vars(dataset)
- } else if (inherits(dataset, "grouped_df")) {
- force(partitioning)
- # Drop the grouping metadata before writing; we've already consumed it
- # now to construct `partitioning` and don't want it in the metadata$r
- dataset <- dplyr::ungroup(dataset)
+ } else {
+ if (inherits(dataset, "grouped_df")) {
+ force(partitioning)
+ # Drop the grouping metadata before writing; we've already consumed it
+ # now to construct `partitioning` and don't want it in the metadata$r
+ dataset <- dplyr::ungroup(dataset)
+ }
+ dataset <- tryCatch(
+ as_adq(dataset),
+ error = function(e) {
+ stop("'dataset' must be a Dataset, RecordBatch, Table,
arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+ }
+ )
}
- scanner <- Scanner$create(dataset)
+ plan <- ExecPlan$create()
+ final_node <- plan$Build(dataset)
+ # TODO: warn/error if there is sorting/top_k? or just compute? (this needs
test)
Review comment:
I think a warning would be good. Someday we should be able to respect
the sort (the dataset writer already has a short serialized path at the front
so this should be straightforward). Dataset writing writes out chunks and
those chunks have indices so if the user asked for a sort then the data in
chunk-0 should precede chunk-1. I've created ARROW-15681 to track this on the
cpp side.
##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
arrow::dataset::ScanNodeOptions{dataset, options});
}
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+ const std::shared_ptr<compute::ExecNode>& final_node,
+ const std::shared_ptr<ds::FileWriteOptions>&
file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem,
+ std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning,
+ std::string basename_template,
+ arrow::dataset::ExistingDataBehavior
existing_data_behavior,
+ int max_partitions) {
+ ds::FileSystemDatasetWriteOptions opts;
+ opts.file_write_options = file_write_options;
+ opts.existing_data_behavior = existing_data_behavior;
+ opts.filesystem = filesystem;
+ opts.base_dir = base_dir;
+ opts.partitioning = partitioning;
+ opts.basename_template = basename_template;
+ opts.max_partitions = max_partitions;
+
+ MakeExecNodeOrStop(
+ "write", final_node->plan(), {final_node.get()},
+ ds::WriteNodeOptions{std::move(opts),
std::move(final_node->output_schema())});
+
+ StopIfNotOk(plan->Validate());
+ StopIfNotOk(plan->StartProducing());
+ StopIfNotOk(plan->finished().status());
+}
// [[dataset::export]]
std::shared_ptr<compute::ExecNode> ExecNode_Filter(
Review comment:
I don't think this method was inside the `#if` block before. Did you
mean to include it? I might also be reading the git diff incorrectly.
##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
arrow::dataset::ScanNodeOptions{dataset, options});
}
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+ const std::shared_ptr<compute::ExecNode>& final_node,
+ const std::shared_ptr<ds::FileWriteOptions>&
file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem,
+ std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning,
+ std::string basename_template,
+ arrow::dataset::ExistingDataBehavior
existing_data_behavior,
+ int max_partitions) {
+ ds::FileSystemDatasetWriteOptions opts;
+ opts.file_write_options = file_write_options;
+ opts.existing_data_behavior = existing_data_behavior;
+ opts.filesystem = filesystem;
+ opts.base_dir = base_dir;
+ opts.partitioning = partitioning;
+ opts.basename_template = basename_template;
+ opts.max_partitions = max_partitions;
+
+ MakeExecNodeOrStop(
+ "write", final_node->plan(), {final_node.get()},
+ ds::WriteNodeOptions{std::move(opts),
std::move(final_node->output_schema())});
+
+ StopIfNotOk(plan->Validate());
+ StopIfNotOk(plan->StartProducing());
+ StopIfNotOk(plan->finished().status());
+}
// [[dataset::export]]
std::shared_ptr<compute::ExecNode> ExecNode_Filter(
Review comment:
It should be possible in that case to use a table as the source but I
agree it's probably not worth worrying about unless there is a compelling use
case.
##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
arrow::dataset::ScanNodeOptions{dataset, options});
}
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+ const std::shared_ptr<compute::ExecNode>& final_node,
+ const std::shared_ptr<ds::FileWriteOptions>&
file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem,
+ std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning,
+ std::string basename_template,
+ arrow::dataset::ExistingDataBehavior
existing_data_behavior,
+ int max_partitions) {
+ ds::FileSystemDatasetWriteOptions opts;
+ opts.file_write_options = file_write_options;
+ opts.existing_data_behavior = existing_data_behavior;
+ opts.filesystem = filesystem;
+ opts.base_dir = base_dir;
+ opts.partitioning = partitioning;
+ opts.basename_template = basename_template;
+ opts.max_partitions = max_partitions;
+
+ MakeExecNodeOrStop(
+ "write", final_node->plan(), {final_node.get()},
+ ds::WriteNodeOptions{std::move(opts),
std::move(final_node->output_schema())});
+
+ StopIfNotOk(plan->Validate());
+ StopIfNotOk(plan->StartProducing());
+ StopIfNotOk(plan->finished().status());
+}
// [[dataset::export]]
std::shared_ptr<compute::ExecNode> ExecNode_Filter(
Review comment:
Soon it will be even easier than that: #12267
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]