jvanstraten commented on code in PR #13914: URL: https://github.com/apache/arrow/pull/13914#discussion_r951173599
########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -248,19 +286,56 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& for (int i = 0; i < input.num_columns; i++) { expressions.emplace_back(compute::field_ref(FieldRef(i))); } + std::vector<std::shared_ptr<Field>> new_fields(project.expressions().size()); + int i = 0; + auto project_schema = input.output_schema; for (const auto& expr : project.expressions()) { - expressions.emplace_back(); - ARROW_ASSIGN_OR_RAISE(expressions.back(), + ARROW_ASSIGN_OR_RAISE(compute::Expression des_expr, FromProto(expr, ext_set, conversion_options)); + auto bound_expr = des_expr.Bind(*input.output_schema); + if (auto* expr_call = bound_expr->call()) { + new_fields[i] = field(expr_call->function_name, + expr_call->kernel->signature->out_type().type()); + } else if (auto* field_ref = des_expr.field_ref()) { + ARROW_ASSIGN_OR_RAISE(FieldPath field_path, + field_ref->FindOne(*input.output_schema)); + ARROW_ASSIGN_OR_RAISE(new_fields[i], field_path.Get(*input.output_schema)); + } else if (auto* literal = des_expr.literal()) { + new_fields[i] = + field("field_" + std::to_string(input.num_columns + i), literal->type()); + } + i++; + expressions.emplace_back(des_expr); + } + while (!new_fields.empty()) { + auto field = new_fields.back(); + ARROW_ASSIGN_OR_RAISE( + project_schema, + project_schema->AddField( + input.num_columns + static_cast<int>(new_fields.size()) - 1, + std::move(field))); + new_fields.pop_back(); Review Comment: I'm pretty sure this adds the fields in reverse order, for what it's worth. Why is this in a separate loop? ########## cpp/src/arrow/engine/substrait/relation_internal.cc: ########## @@ -399,17 +506,45 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet& ExtensionIdRegistry::SubstraitAggregateToArrow converter, ext_set.registry()->GetSubstraitAggregateToArrow(aggregate_call.id())); ARROW_ASSIGN_OR_RAISE(compute::Aggregate arrow_agg, converter(aggregate_call)); + + // find aggregate field ids from schema + const auto field_ref = arrow_agg.target; + ARROW_ASSIGN_OR_RAISE(auto match, field_ref.FindOne(*input_schema)); + agg_src_field_ids[measure_id] = match[0]; + aggregates.push_back(std::move(arrow_agg)); } else { return Status::Invalid("substrait::AggregateFunction not provided"); } } + FieldVector output_fields; + output_fields.reserve(key_field_ids.size() + agg_src_field_ids.size()); + // extract aggregate fields to output schema + for (int id = 0; id < static_cast<int>(agg_src_field_ids.size()); id++) { + output_fields.emplace_back(input_schema->field(agg_src_field_ids[id])); + } + // extract key fields to output schema + for (int id = 0; id < static_cast<int>(key_field_ids.size()); id++) { + output_fields.emplace_back(input_schema->field(key_field_ids[id])); + } Review Comment: Wrong order; keys come first. > The list of distinct columns from each grouping set (ordered by their first appearance) followed by the list of measures in declaration order, [...] https://substrait.io/relations/logical_relations/#aggregate-operation ########## cpp/src/arrow/engine/substrait/relation_internal.h: ########## @@ -38,6 +38,8 @@ struct DeclarationInfo { /// The number of columns returned by the declaration. int num_columns; + + std::shared_ptr<Schema> output_schema; Review Comment: I feel like this supersedes `num_columns`; `num_columns` should always equal the number of fields in the schema. I only put that there because I didn't want to have to bother with the whole schema yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org