vibhatha commented on code in PR #14118: URL: https://github.com/apache/arrow/pull/14118#discussion_r978862413
########## cpp/src/arrow/engine/substrait/serde_test.cc: ########## @@ -2082,43 +2103,135 @@ TEST(Substrait, BasicPlanRoundTrippingEndToEnd) { auto sink_decls, DeserializePlans( *serialized_plan, [] { return kNullConsumer; }, ext_id_reg, &ext_set)); - // filter declaration - auto& roundtripped_filter = std::get<compute::Declaration>(sink_decls[0].inputs[0]); - const auto& filter_opts = - checked_cast<const compute::FilterNodeOptions&>(*(roundtripped_filter.options)); - auto roundtripped_expr = filter_opts.filter_expression; - - if (auto* call = roundtripped_expr.call()) { - EXPECT_EQ(call->function_name, "equal"); - auto args = call->arguments; - auto left_index = args[0].field_ref()->field_path()->indices()[0]; - EXPECT_EQ(dummy_schema->field_names()[left_index], filter_col_left); - auto right_index = args[1].field_ref()->field_path()->indices()[0]; - EXPECT_EQ(dummy_schema->field_names()[right_index], filter_col_right); - } - // scan declaration + // assert filter declaration + const auto& roundtripped_filter = + std::get<compute::Declaration>(sink_decls[0].inputs[0]); + AssertFilterRelation(roundtripped_filter, std::move(filter_expr), dummy_schema); + // assert scan declaration const auto& roundtripped_scan = std::get<compute::Declaration>(roundtripped_filter.inputs[0]); - const auto& dataset_opts = - checked_cast<const dataset::ScanNodeOptions&>(*(roundtripped_scan.options)); - const auto& roundripped_ds = dataset_opts.dataset; - EXPECT_TRUE(roundripped_ds->schema()->Equals(*dummy_schema)); - ASSERT_OK_AND_ASSIGN(auto roundtripped_frgs, roundripped_ds->GetFragments()); - ASSERT_OK_AND_ASSIGN(auto expected_frgs, dataset->GetFragments()); + AssertScanRelation(roundtripped_scan, dataset, dummy_schema); + // assert results + AssertPlanExecutionResult(expected_table, roundtripped_filter, dummy_schema, + exec_context); +} - auto roundtrip_frg_vec = IteratorToVector(std::move(roundtripped_frgs)); - auto expected_frg_vec = IteratorToVector(std::move(expected_frgs)); - EXPECT_EQ(expected_frg_vec.size(), roundtrip_frg_vec.size()); - int64_t idx = 0; - for (auto fragment : expected_frg_vec) { - const auto* l_frag = checked_cast<const dataset::FileFragment*>(fragment.get()); - const auto* r_frag = - checked_cast<const dataset::FileFragment*>(roundtrip_frg_vec[idx++].get()); - EXPECT_TRUE(l_frag->Equals(*r_frag)); +TEST(Substrait, FilterProjectPlanRoundTripping) { +#ifdef _WIN32 + GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows"; +#endif + compute::ExecContext exec_context; + arrow::dataset::internal::Initialize(); + + auto dummy_schema = schema( + {field("key", int32()), field("shared", int32()), field("distinct", int32())}); + + // creating a dummy dataset using a dummy table + auto table = TableFromJSON(dummy_schema, {R"([ + [1, 1, 10], + [3, 4, 20] + ])", + R"([ + [0, 2, 1], + [1, 3, 2], + [4, 1, 3], + [3, 1, 3], + [1, 2, 5] + ])", + R"([ + [2, 2, 12], + [5, 3, 12], + [1, 3, 12] + ])"}); + + auto format = std::make_shared<arrow::dataset::IpcFileFormat>(); + auto filesystem = std::make_shared<fs::LocalFileSystem>(); + const std::string file_name = "serde_project_test.arrow"; + + ASSERT_OK_AND_ASSIGN(auto tempdir, + arrow::internal::TemporaryDir::Make("substrait-tempdir-project-")); + ASSERT_OK_AND_ASSIGN(auto file_path, tempdir->path().Join(file_name)); + std::string file_path_str = file_path.ToString(); + + WriteIpcData(file_path_str, filesystem, table); + + std::vector<fs::FileInfo> files; + const std::vector<std::string> f_paths = {file_path_str}; + + for (const auto& f_path : f_paths) { + ASSERT_OK_AND_ASSIGN(auto f_file, filesystem->GetFileInfo(f_path)); + files.push_back(std::move(f_file)); } - ASSERT_OK_AND_ASSIGN(auto rnd_trp_table, - GetTableFromPlan(roundtripped_filter, exec_context, dummy_schema)); - EXPECT_TRUE(expected_table->Equals(*rnd_trp_table)); + + ASSERT_OK_AND_ASSIGN(auto ds_factory, dataset::FileSystemDatasetFactory::Make( + filesystem, std::move(files), format, {})); + ASSERT_OK_AND_ASSIGN(auto dataset, ds_factory->Finish(dummy_schema)); + + auto scan_options = std::make_shared<dataset::ScanOptions>(); + scan_options->projection = compute::project({}, {}); + compute::Expression project_fp_expr = compute::call( + "add", {compute::field_ref(FieldPath({1})), compute::field_ref(FieldPath({2}))}); + // Acero only outputs the expressions mentioned in the projection but Substrait + // outputs existing columns plus the expression. + // For validation purpose, a project expression with expected projection + // field plus existing fields is created for Acero. + // And for Substrait just the project expression with expected projection. + std::vector<compute::Expression> acero_project_fp_exprs = { + compute::field_ref(FieldPath({0})), compute::field_ref(FieldPath({1})), + compute::field_ref(FieldPath({2})), project_fp_expr}; + std::vector<compute::Expression> substrait_project_fp_exprs = {project_fp_expr}; + + auto acero_declarations = compute::Declaration::Sequence( + {compute::Declaration( + {"scan", dataset::ScanNodeOptions{dataset, scan_options}, "s"}), + compute::Declaration( + {"project", compute::ProjectNodeOptions{acero_project_fp_exprs}, "p"})}); + + // adding the project expression field to schema + ASSERT_OK_AND_ASSIGN(auto project_schema, + dummy_schema->AddField(dummy_schema->num_fields(), + field("add(shared,distinct)", int32()))); + + ASSERT_OK_AND_ASSIGN( + auto expected_table, + GetTableFromPlan(acero_declarations, exec_context, project_schema)); + + std::shared_ptr<ExtensionIdRegistry> sp_ext_id_reg = MakeExtensionIdRegistry(); + ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get(); + ExtensionSet ext_set(ext_id_reg); + + // declaration which Substrait would expect since the idea is to do the projection + // which would produce an equivalent result to the declaration created for Acero with + // additional project fields. + auto substrait_declarations = compute::Declaration::Sequence( + {compute::Declaration( + {"scan", dataset::ScanNodeOptions{dataset, scan_options}, "s"}), + compute::Declaration( + {"project", compute::ProjectNodeOptions{substrait_project_fp_exprs}, "p"})}); + + ASSERT_OK_AND_ASSIGN(auto serialized_plan, + SerializePlan(substrait_declarations, &ext_set)); + + ASSERT_OK_AND_ASSIGN( + auto sink_decls, + DeserializePlans( + *serialized_plan, [] { return kNullConsumer; }, ext_id_reg, &ext_set)); + // assert project declaration + const auto& roundtripped_project = + std::get<compute::Declaration>(sink_decls[0].inputs[0]); + // assert project declaration + // Note: the provided expressions for Substrait declaration only contains one + // expression, but substrait produces expressions for the existing number of fields plus + // provided expression. Since the output expressions from the deserialized relation + // contains fields which weren't used in the project expression. + AssertProjectRelation(roundtripped_project, acero_project_fp_exprs, project_schema); Review Comment: Fair enough. Let's try that route. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org