drin commented on code in PR #14071:
URL: https://github.com/apache/arrow/pull/14071#discussion_r966171091
##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -52,159 +55,239 @@ Status CheckRelCommon(const RelMessage& rel) {
return Status::OK();
}
-Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const
ExtensionSet& ext_set,
- const ConversionOptions& conversion_options)
{
- static bool dataset_init = false;
- if (!dataset_init) {
- dataset_init = true;
- dataset::internal::Initialize();
+Status CheckReadRelation(const substrait::ReadRel& rel,
+ const ConversionOptions& conversion_options) {
+ // NOTE: scan_options->projection is not used by the scanner and thus can't
be used
+ if (rel.has_projection()) {
+ return Status::NotImplemented("substrait::ReadRel::projection");
}
- switch (rel.rel_type_case()) {
- case substrait::Rel::RelTypeCase::kRead: {
- const auto& read = rel.read();
- RETURN_NOT_OK(CheckRelCommon(read));
+ if (rel.has_named_table()) {
+ if (!conversion_options.named_table_provider) {
+ return Status::Invalid(
+ "plan contained a named table but a NamedTableProvider has not been "
+ "configured");
+ }
+
+ return Status::OK();
+ }
- ARROW_ASSIGN_OR_RAISE(auto base_schema,
- FromProto(read.base_schema(), ext_set,
conversion_options));
- auto num_columns = static_cast<int>(base_schema->fields().size());
+ if (!rel.has_local_files()) {
+ return Status::NotImplemented(
+ "substrait::ReadRel with read_type other than LocalFiles");
+ }
- auto scan_options = std::make_shared<dataset::ScanOptions>();
- scan_options->use_threads = true;
+ if (rel.local_files().has_advanced_extension()) {
+ return
Status::NotImplemented("substrait::ReadRel::LocalFiles::advanced_extension");
+ }
- if (read.has_filter()) {
- ARROW_ASSIGN_OR_RAISE(scan_options->filter,
- FromProto(read.filter(), ext_set,
conversion_options));
- }
+ return Status::OK();
+}
- if (read.has_projection()) {
- // NOTE: scan_options->projection is not used by the scanner and thus
can't be
- // used for this
- return Status::NotImplemented("substrait::ReadRel::projection");
- }
+Status CheckFileItem(const substrait::ReadRel_LocalFiles_FileOrFiles&
file_item) {
+ if (file_item.partition_index() != 0) {
+ return Status::NotImplemented(
+ "non-default
substrait::ReadRel::LocalFiles::FileOrFiles::partition_index");
+ }
- if (read.has_named_table()) {
- if (!conversion_options.named_table_provider) {
- return Status::Invalid(
- "plan contained a named table but a NamedTableProvider has not
been "
- "configured");
- }
- const NamedTableProvider& named_table_provider =
- conversion_options.named_table_provider;
- const substrait::ReadRel::NamedTable& named_table = read.named_table();
- std::vector<std::string> table_names(named_table.names().begin(),
- named_table.names().end());
- ARROW_ASSIGN_OR_RAISE(compute::Declaration source_decl,
- named_table_provider(table_names));
- return DeclarationInfo{std::move(source_decl), num_columns};
- }
+ if (file_item.start() != 0) {
+ return Status::NotImplemented(
+ "non-default substrait::ReadRel::LocalFiles::FileOrFiles::start
offset");
+ }
+
+ if (file_item.length() != 0) {
+ return Status::NotImplemented(
+ "non-default substrait::ReadRel::LocalFiles::FileOrFiles::length");
+ }
+
+ return Status::OK();
+}
+
+Status CheckFilePathUri(const ::arrow::internal::Uri& uri) {
+ if (!uri.is_file_scheme()) {
+ return Status::NotImplemented("substrait::ReadRel::LocalFiles item (",
uri.ToString(),
+ ") with other than local filesystem
(file:///)");
+ }
+
+ if (uri.port() != -1) {
+ return Status::NotImplemented("substrait::ReadRel::LocalFiles item (",
uri.ToString(),
+ ") should not have a port number in path");
+ }
+
+ if (!uri.query_string().empty()) {
+ return Status::NotImplemented("substrait::ReadRel::LocalFiles item (",
uri.ToString(),
+ ") should not have a query string in path");
+ }
+
+ return Status::OK();
+}
+
+// Other helper functions
+Status DiscoverFilesFromDir(std::shared_ptr<fs::LocalFileSystem>& local_fs,
+ std::string dirpath, std::vector<fs::FileInfo>&
rel_fpaths) {
+ // Define a selector for a recursive descent
+ fs::FileSelector selector;
+ selector.base_dir = dirpath;
+ selector.recursive = true;
+
+ ARROW_ASSIGN_OR_RAISE(auto file_infos, local_fs->GetFileInfo(selector));
+ for (auto& file_info : file_infos) {
+ if (file_info.type() == fs::FileType::File) {
+ rel_fpaths.push_back(file_info);
+ }
+ }
+
+ return Status::OK();
+}
+
+// Function that implements "FromProto" for a substrait::ReadRel (read
relation)
+Result<DeclarationInfo> FromReadRelation(const substrait::ReadRel& rel,
Review Comment:
I just broke this portion out into a separate function and tried to keep a
consistent function name.
I can back out the re-organization though, I'll create a JIRA and try to
move the changes to an appropriate PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]