vibhatha commented on code in PR #14071:
URL: https://github.com/apache/arrow/pull/14071#discussion_r965403621


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -52,159 +55,239 @@ Status CheckRelCommon(const RelMessage& rel) {
   return Status::OK();
 }
 
-Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const 
ExtensionSet& ext_set,
-                                  const ConversionOptions& conversion_options) 
{
-  static bool dataset_init = false;
-  if (!dataset_init) {
-    dataset_init = true;
-    dataset::internal::Initialize();
+Status CheckReadRelation(const substrait::ReadRel& rel,
+                         const ConversionOptions& conversion_options) {
+  // NOTE: scan_options->projection is not used by the scanner and thus can't 
be used
+  if (rel.has_projection()) {
+    return Status::NotImplemented("substrait::ReadRel::projection");
   }
 
-  switch (rel.rel_type_case()) {
-    case substrait::Rel::RelTypeCase::kRead: {
-      const auto& read = rel.read();
-      RETURN_NOT_OK(CheckRelCommon(read));
+  if (rel.has_named_table()) {
+    if (!conversion_options.named_table_provider) {
+      return Status::Invalid(
+          "plan contained a named table but a NamedTableProvider has not been "
+          "configured");
+    }
+
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(auto base_schema,
-                            FromProto(read.base_schema(), ext_set, 
conversion_options));
-      auto num_columns = static_cast<int>(base_schema->fields().size());
+  if (!rel.has_local_files()) {
+    return Status::NotImplemented(
+        "substrait::ReadRel with read_type other than LocalFiles");
+  }
 
-      auto scan_options = std::make_shared<dataset::ScanOptions>();
-      scan_options->use_threads = true;
+  if (rel.local_files().has_advanced_extension()) {
+    return 
Status::NotImplemented("substrait::ReadRel::LocalFiles::advanced_extension");
+  }
 
-      if (read.has_filter()) {
-        ARROW_ASSIGN_OR_RAISE(scan_options->filter,
-                              FromProto(read.filter(), ext_set, 
conversion_options));
-      }
+  return Status::OK();
+}
 
-      if (read.has_projection()) {
-        // NOTE: scan_options->projection is not used by the scanner and thus 
can't be
-        // used for this
-        return Status::NotImplemented("substrait::ReadRel::projection");
-      }
+Status CheckFileItem(const substrait::ReadRel_LocalFiles_FileOrFiles& 
file_item) {
+  if (file_item.partition_index() != 0) {
+    return Status::NotImplemented(
+        "non-default 
substrait::ReadRel::LocalFiles::FileOrFiles::partition_index");
+  }
 
-      if (read.has_named_table()) {
-        if (!conversion_options.named_table_provider) {
-          return Status::Invalid(
-              "plan contained a named table but a NamedTableProvider has not 
been "
-              "configured");
-        }
-        const NamedTableProvider& named_table_provider =
-            conversion_options.named_table_provider;
-        const substrait::ReadRel::NamedTable& named_table = read.named_table();
-        std::vector<std::string> table_names(named_table.names().begin(),
-                                             named_table.names().end());
-        ARROW_ASSIGN_OR_RAISE(compute::Declaration source_decl,
-                              named_table_provider(table_names));
-        return DeclarationInfo{std::move(source_decl), num_columns};
-      }
+  if (file_item.start() != 0) {
+    return Status::NotImplemented(
+        "non-default substrait::ReadRel::LocalFiles::FileOrFiles::start 
offset");
+  }
+
+  if (file_item.length() != 0) {
+    return Status::NotImplemented(
+        "non-default substrait::ReadRel::LocalFiles::FileOrFiles::length");
+  }
+
+  return Status::OK();
+}
+
+Status CheckFilePathUri(const ::arrow::internal::Uri& uri) {
+  if (!uri.is_file_scheme()) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") with other than local filesystem 
(file:///)");
+  }
+
+  if (uri.port() != -1) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") should not have a port number in path");
+  }
+
+  if (!uri.query_string().empty()) {
+    return Status::NotImplemented("substrait::ReadRel::LocalFiles item (", 
uri.ToString(),
+                                  ") should not have a query string in path");
+  }
+
+  return Status::OK();
+}
+
+// Other helper functions
+Status DiscoverFilesFromDir(std::shared_ptr<fs::LocalFileSystem>& local_fs,
+                            std::string dirpath, std::vector<fs::FileInfo>& 
rel_fpaths) {
+  // Define a selector for a recursive descent
+  fs::FileSelector selector;
+  selector.base_dir = dirpath;
+  selector.recursive = true;
+
+  ARROW_ASSIGN_OR_RAISE(auto file_infos, local_fs->GetFileInfo(selector));
+  for (auto& file_info : file_infos) {
+    if (file_info.type() == fs::FileType::File) {
+      rel_fpaths.push_back(file_info);
+    }
+  }
+
+  return Status::OK();
+}
+
+// Function that implements "FromProto" for a substrait::ReadRel (read 
relation)
+Result<DeclarationInfo> FromReadRelation(const substrait::ReadRel& rel,

Review Comment:
   I am not sure if we should change the function name. I believe `FromProto` 
is used by all the other relations. If we are going to change that format, 
better to do it in a separate PR for all relations and keep things uniform. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to