westonpace commented on code in PR #13401:
URL: https://github.com/apache/arrow/pull/13401#discussion_r918433307


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -318,5 +324,81 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       rel.DebugString());
 }
 
+namespace {
+
+Result<std::unique_ptr<substrait::ReadRel>> MakeReadRelation(
+    const compute::Declaration& declaration, ExtensionSet* ext_set) {
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+
+  auto dataset =
+      
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid("Can only convert file system datasets to a 
Substrait plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), 
ext_set));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = 
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path("file://" + file);
+
+    // set file format
+    // arrow and feather are temporarily handled via the Parquet format until
+    // upgraded to the latest Substrait version.
+    auto format_type_name = dataset->format()->type_name();
+    if (format_type_name == "parquet") {
+      auto parquet_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>();
+      read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release());
+    } else if (format_type_name == "arrow") {
+      auto arrow_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ArrowReadOptions>();
+      read_rel_lfs_ffs->set_allocated_arrow(arrow_fmt.release());
+    } else if (format_type_name == "orc") {
+      auto orc_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_OrcReadOptions>();
+      read_rel_lfs_ffs->set_allocated_orc(orc_fmt.release());
+    } else {
+      return Status::Invalid("Unsupported file type : ", format_type_name);

Review Comment:
   ```suggestion
         return Status::NotImplemented("Unsupported file type: ", 
format_type_name);
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1400,188 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, SerializeRelation) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  auto dummy_schema = schema({field("foo", binary())});
+  // creating a dummy dataset using a dummy table
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {"/tmp/data1.parquet", 
"/tmp/data2.parquet"};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            std::move(filesystem), 
std::move(files),
+                                            std::move(format), {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto options = std::make_shared<dataset::ScanOptions>();
+  options->projection = compute::project({}, {});
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, options};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options});
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_rel,
+                       SerializeRelation(scan_declaration, &ext_set));
+  ASSERT_OK_AND_ASSIGN(auto deserialized_decl,
+                       DeserializeRelation(*serialized_rel, ext_set));
+
+  auto dataset_comparator = [](std::shared_ptr<dataset::Dataset> ds_lhs,
+                               std::shared_ptr<dataset::Dataset> ds_rhs) -> 
bool {
+    const auto& fsd_lhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& fsd_rhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& files_lhs = fsd_lhs.files();
+    const auto& files_rhs = fsd_rhs.files();
+
+    if (files_lhs.size() != files_rhs.size()) {
+      return false;
+    }
+    uint64_t fidx = 0;
+    for (const auto& l_file : files_lhs) {
+      if (l_file != files_rhs[fidx]) {
+        return false;
+      }
+      fidx++;
+    }
+    bool cmp_file_format = fsd_lhs.format()->Equals(*fsd_lhs.format());
+    bool cmp_file_system = fsd_lhs.filesystem()->Equals(fsd_rhs.filesystem());
+    return cmp_file_format && cmp_file_system;
+  };
+
+  auto scan_option_comparator = [dataset_comparator](
+                                    const dataset::ScanNodeOptions& lhs,
+                                    const dataset::ScanNodeOptions& rhs) -> 
bool {
+    bool cmp_rso = lhs.require_sequenced_output == 
rhs.require_sequenced_output;
+    bool cmp_ds = dataset_comparator(lhs.dataset, rhs.dataset);
+    return cmp_rso && cmp_ds;
+  };
+
+  EXPECT_EQ(deserialized_decl.factory_name, scan_declaration.factory_name);
+  const auto& lhs =
+      checked_cast<const 
dataset::ScanNodeOptions&>(*deserialized_decl.options);
+  const auto& rhs =
+      checked_cast<const dataset::ScanNodeOptions&>(*scan_declaration.options);
+  ASSERT_TRUE(scan_option_comparator(lhs, rhs));
+#endif
+}
+
+TEST(Substrait, SerializeRelationEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  compute::ExecContext exec_context;
+
+  auto dummy_schema = schema({field("a", int32()), field("b", int32())});
+  auto table = TableFromJSON(dummy_schema, {R"([
+    [1, 1],
+    [3, 4]
+                        ])",
+                                            R"([
+    [0, 2],
+    [1, 3],
+    [4, 1],
+    [3, 1],
+    [1, 2]
+                        ])",
+                                            R"([
+    [2, 2],
+    [5, 3],
+    [1, 3]
+                        ])"});
+  // Note:: adding `/` to support the currently supported file read format 
from file
+  // system for Substrait
+  const std::string path = "/testing.parquet";
+
+  EXPECT_OK_AND_ASSIGN(auto filesystem,
+                       fs::internal::MockFileSystem::Make(fs::kNoTime, {}));
+
+  EXPECT_EQ(WriteParquetData(path, filesystem, table), true);
+
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+  std::vector<fs::FileInfo> files;
+  ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(path));
+  files.push_back(std::move(f_file));
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            std::move(filesystem), 
std::move(files),
+                                            std::move(format), {}));
+
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto options = std::make_shared<dataset::ScanOptions>();
+  options->projection = compute::project({}, {});
+
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, options};
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch> > sink_gen;
+
+  auto sink_node_options = compute::SinkNodeOptions{&sink_gen};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options});
+  auto sink_declaration = compute::Declaration({"sink", sink_node_options});
+
+  auto declarations =
+      compute::Declaration::Sequence({scan_declaration, sink_declaration});
+
+  ASSERT_OK_AND_ASSIGN(auto plan, compute::ExecPlan::Make(&exec_context));
+  ASSERT_OK_AND_ASSIGN(auto decl, declarations.AddToPlan(plan.get()));
+
+  ASSERT_OK(decl->Validate());
+
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
compute::MakeGeneratorReader(
+      dummy_schema, std::move(sink_gen), exec_context.memory_pool());
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_OK(plan->StartProducing());
+
+  std::shared_ptr<arrow::Table> response_table;
+
+  ASSERT_OK_AND_ASSIGN(response_table,
+                       arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_rel,
+                       SerializeRelation(scan_declaration, &ext_set));
+  ASSERT_OK_AND_ASSIGN(auto deserialized_decl,
+                       DeserializeRelation(*serialized_rel, ext_set));
+
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch> > des_sink_gen;

Review Comment:
   What is the `des` prefix for?  Can you use a whole word?



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -45,6 +52,15 @@ using internal::checked_cast;
 
 namespace engine {
 
+bool WriteParquetData(const std::string& path,

Review Comment:
   Can you just return the `Status` here instead of a `bool`?



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -318,5 +324,81 @@ Result<compute::Declaration> FromProto(const 
substrait::Rel& rel,
       rel.DebugString());
 }
 
+namespace {
+
+Result<std::unique_ptr<substrait::ReadRel>> MakeReadRelation(
+    const compute::Declaration& declaration, ExtensionSet* ext_set) {
+  auto read_rel = make_unique<substrait::ReadRel>();
+  const auto& scan_node_options =
+      checked_cast<const dataset::ScanNodeOptions&>(*declaration.options);
+
+  auto dataset =
+      
dynamic_cast<dataset::FileSystemDataset*>(scan_node_options.dataset.get());
+  if (dataset == nullptr) {
+    return Status::Invalid("Can only convert file system datasets to a 
Substrait plan.");
+  }
+  // set schema
+  ARROW_ASSIGN_OR_RAISE(auto named_struct, ToProto(*dataset->schema(), 
ext_set));
+  read_rel->set_allocated_base_schema(named_struct.release());
+
+  // set local files
+  auto read_rel_lfs = make_unique<substrait::ReadRel_LocalFiles>();
+  for (const auto& file : dataset->files()) {
+    auto read_rel_lfs_ffs = 
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles>();
+    read_rel_lfs_ffs->set_uri_path("file://" + file);
+
+    // set file format
+    // arrow and feather are temporarily handled via the Parquet format until
+    // upgraded to the latest Substrait version.
+    auto format_type_name = dataset->format()->type_name();
+    if (format_type_name == "parquet") {
+      auto parquet_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ParquetReadOptions>();
+      read_rel_lfs_ffs->set_allocated_parquet(parquet_fmt.release());
+    } else if (format_type_name == "arrow") {
+      auto arrow_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_ArrowReadOptions>();
+      read_rel_lfs_ffs->set_allocated_arrow(arrow_fmt.release());
+    } else if (format_type_name == "orc") {
+      auto orc_fmt =
+          
make_unique<substrait::ReadRel_LocalFiles_FileOrFiles_OrcReadOptions>();
+      read_rel_lfs_ffs->set_allocated_orc(orc_fmt.release());
+    } else {
+      return Status::Invalid("Unsupported file type : ", format_type_name);
+    }
+    read_rel_lfs->mutable_items()->AddAllocated(read_rel_lfs_ffs.release());
+  }
+  *read_rel->mutable_local_files() = *read_rel_lfs.get();

Review Comment:
   ```suggestion
     read_rel->set_allocated_local_files(read_rel_lfs.release());
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1400,188 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, SerializeRelation) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  auto dummy_schema = schema({field("foo", binary())});
+  // creating a dummy dataset using a dummy table
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {"/tmp/data1.parquet", 
"/tmp/data2.parquet"};

Review Comment:
   `/tmp/` will not be portable once we support Windows URIs.  Can you use 
`arrow::internal::TemporaryDir` from `arrow/util/io_util.h`?



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1400,188 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, SerializeRelation) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  auto dummy_schema = schema({field("foo", binary())});
+  // creating a dummy dataset using a dummy table
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {"/tmp/data1.parquet", 
"/tmp/data2.parquet"};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            std::move(filesystem), 
std::move(files),
+                                            std::move(format), {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto options = std::make_shared<dataset::ScanOptions>();
+  options->projection = compute::project({}, {});
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, options};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options});
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_rel,
+                       SerializeRelation(scan_declaration, &ext_set));
+  ASSERT_OK_AND_ASSIGN(auto deserialized_decl,
+                       DeserializeRelation(*serialized_rel, ext_set));
+
+  auto dataset_comparator = [](std::shared_ptr<dataset::Dataset> ds_lhs,
+                               std::shared_ptr<dataset::Dataset> ds_rhs) -> 
bool {
+    const auto& fsd_lhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& fsd_rhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& files_lhs = fsd_lhs.files();
+    const auto& files_rhs = fsd_rhs.files();
+
+    if (files_lhs.size() != files_rhs.size()) {
+      return false;
+    }
+    uint64_t fidx = 0;
+    for (const auto& l_file : files_lhs) {
+      if (l_file != files_rhs[fidx]) {
+        return false;
+      }
+      fidx++;
+    }
+    bool cmp_file_format = fsd_lhs.format()->Equals(*fsd_lhs.format());
+    bool cmp_file_system = fsd_lhs.filesystem()->Equals(fsd_rhs.filesystem());
+    return cmp_file_format && cmp_file_system;
+  };
+
+  auto scan_option_comparator = [dataset_comparator](
+                                    const dataset::ScanNodeOptions& lhs,
+                                    const dataset::ScanNodeOptions& rhs) -> 
bool {
+    bool cmp_rso = lhs.require_sequenced_output == 
rhs.require_sequenced_output;
+    bool cmp_ds = dataset_comparator(lhs.dataset, rhs.dataset);
+    return cmp_rso && cmp_ds;
+  };
+
+  EXPECT_EQ(deserialized_decl.factory_name, scan_declaration.factory_name);
+  const auto& lhs =
+      checked_cast<const 
dataset::ScanNodeOptions&>(*deserialized_decl.options);
+  const auto& rhs =
+      checked_cast<const dataset::ScanNodeOptions&>(*scan_declaration.options);
+  ASSERT_TRUE(scan_option_comparator(lhs, rhs));
+#endif
+}
+
+TEST(Substrait, SerializeRelationEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  compute::ExecContext exec_context;
+
+  auto dummy_schema = schema({field("a", int32()), field("b", int32())});
+  auto table = TableFromJSON(dummy_schema, {R"([
+    [1, 1],
+    [3, 4]
+                        ])",
+                                            R"([
+    [0, 2],
+    [1, 3],
+    [4, 1],
+    [3, 1],
+    [1, 2]
+                        ])",
+                                            R"([
+    [2, 2],
+    [5, 3],
+    [1, 3]
+                        ])"});

Review Comment:
   The formatting here seems off



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -45,6 +52,15 @@ using internal::checked_cast;
 
 namespace engine {
 
+bool WriteParquetData(const std::string& path,
+                      const std::shared_ptr<fs::FileSystem> file_system,
+                      const std::shared_ptr<Table> input, const int64_t 
chunk_size = 3) {

Review Comment:
   You only call this in one place and you never supply a custom chunk size.  
Can you remove this parameter?  Or are you planning a test case that will 
manipulate it?  It doesn't seem relevant to the issue under test.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1400,188 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, SerializeRelation) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  auto dummy_schema = schema({field("foo", binary())});
+  // creating a dummy dataset using a dummy table
+  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+  auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+  std::vector<fs::FileInfo> files;
+  const std::vector<std::string> f_paths = {"/tmp/data1.parquet", 
"/tmp/data2.parquet"};
+
+  for (const auto& f_path : f_paths) {
+    ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path));
+    files.push_back(std::move(f_file));
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto ds_factory, 
dataset::FileSystemDatasetFactory::Make(
+                                            std::move(filesystem), 
std::move(files),
+                                            std::move(format), {}));
+  ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema));
+
+  auto options = std::make_shared<dataset::ScanOptions>();
+  options->projection = compute::project({}, {});
+  auto scan_node_options = dataset::ScanNodeOptions{dataset, options};
+
+  auto scan_declaration = compute::Declaration({"scan", scan_node_options});
+
+  ASSERT_OK_AND_ASSIGN(auto serialized_rel,
+                       SerializeRelation(scan_declaration, &ext_set));
+  ASSERT_OK_AND_ASSIGN(auto deserialized_decl,
+                       DeserializeRelation(*serialized_rel, ext_set));
+
+  auto dataset_comparator = [](std::shared_ptr<dataset::Dataset> ds_lhs,
+                               std::shared_ptr<dataset::Dataset> ds_rhs) -> 
bool {
+    const auto& fsd_lhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& fsd_rhs = checked_cast<const 
dataset::FileSystemDataset&>(*ds_lhs);
+    const auto& files_lhs = fsd_lhs.files();
+    const auto& files_rhs = fsd_rhs.files();
+
+    if (files_lhs.size() != files_rhs.size()) {
+      return false;
+    }
+    uint64_t fidx = 0;
+    for (const auto& l_file : files_lhs) {
+      if (l_file != files_rhs[fidx]) {
+        return false;
+      }
+      fidx++;
+    }
+    bool cmp_file_format = fsd_lhs.format()->Equals(*fsd_lhs.format());
+    bool cmp_file_system = fsd_lhs.filesystem()->Equals(fsd_rhs.filesystem());
+    return cmp_file_format && cmp_file_system;
+  };
+
+  auto scan_option_comparator = [dataset_comparator](
+                                    const dataset::ScanNodeOptions& lhs,
+                                    const dataset::ScanNodeOptions& rhs) -> 
bool {
+    bool cmp_rso = lhs.require_sequenced_output == 
rhs.require_sequenced_output;
+    bool cmp_ds = dataset_comparator(lhs.dataset, rhs.dataset);
+    return cmp_rso && cmp_ds;
+  };
+
+  EXPECT_EQ(deserialized_decl.factory_name, scan_declaration.factory_name);
+  const auto& lhs =
+      checked_cast<const 
dataset::ScanNodeOptions&>(*deserialized_decl.options);
+  const auto& rhs =
+      checked_cast<const dataset::ScanNodeOptions&>(*scan_declaration.options);
+  ASSERT_TRUE(scan_option_comparator(lhs, rhs));
+#endif
+}
+
+TEST(Substrait, SerializeRelationEndToEnd) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ExtensionSet ext_set;
+  compute::ExecContext exec_context;
+
+  auto dummy_schema = schema({field("a", int32()), field("b", int32())});
+  auto table = TableFromJSON(dummy_schema, {R"([
+    [1, 1],
+    [3, 4]
+                        ])",
+                                            R"([
+    [0, 2],
+    [1, 3],
+    [4, 1],
+    [3, 1],
+    [1, 2]
+                        ])",
+                                            R"([
+    [2, 2],
+    [5, 3],
+    [1, 3]
+                        ])"});
+  // Note:: adding `/` to support the currently supported file read format 
from file
+  // system for Substrait

Review Comment:
   I'm not sure I understand this note.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to