westonpace commented on code in PR #12672: URL: https://github.com/apache/arrow/pull/12672#discussion_r846438505
########## cpp/src/arrow/engine/substrait/util.cc: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/engine/substrait/util.h" + +namespace arrow { + +namespace engine { + +Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) { + // Consume a batch of data + bool did_push = producer_.Push(batch); + if (!did_push) return Status::ExecutionError("Producer closed already"); + return Status::OK(); +} + +Future<> SubstraitSinkConsumer::Finish() { + producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>()); + if (producer_.Close()) { + return Future<>::MakeFinished(); + } + return Future<>::MakeFinished( + Status::ExecutionError("Error occurred in closing the batch producer")); +} + +Status SubstraitSinkConsumer::Init(const std::shared_ptr<Schema>& schema) { + return Status::OK(); +} + +/// \brief An executor to run a Substrait Query +/// This interface is provided as a utility when creating language +/// bindings for consuming a Substrait plan. +class ARROW_ENGINE_EXPORT SubstraitExecutor { + public: + explicit SubstraitExecutor( + std::shared_ptr<Buffer> substrait_buffer, + AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator, + std::shared_ptr<cp::ExecPlan> plan, cp::ExecContext exec_context) + : substrait_buffer_(substrait_buffer), + generator_(generator), + plan_(std::move(plan)), + exec_context_(exec_context) {} + + Result<std::shared_ptr<RecordBatchReader>> Execute() { + RETURN_NOT_OK(SubstraitExecutor::Init()); + for (const cp::Declaration& decl : declarations_) { + RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status()); + } + ARROW_RETURN_NOT_OK(plan_->Validate()); + ARROW_RETURN_NOT_OK(plan_->StartProducing()); + // schema of the output can be obtained by the output_schema + // of the input to the sink node. + auto schema = plan_->sinks()[0]->inputs()[0]->output_schema(); + std::shared_ptr<RecordBatchReader> sink_reader = cp::MakeGeneratorReader( + schema, std::move(*generator_), exec_context_.memory_pool()); + return sink_reader; + } + + Status Close() { return plan_->finished().status(); } + + Status Init() { + if (substrait_buffer_ == NULLPTR) { + return Status::Invalid("Buffer containing Substrait plan is null."); + } + std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] { + return std::make_shared<SubstraitSinkConsumer>(generator_); + }; + ARROW_ASSIGN_OR_RAISE(declarations_, + engine::DeserializePlan(*substrait_buffer_, consumer_factory)); + return Status::OK(); + } + + private: + std::shared_ptr<Buffer> substrait_buffer_; + AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator_; + std::vector<cp::Declaration> declarations_; + std::shared_ptr<cp::ExecPlan> plan_; + cp::ExecContext exec_context_; +}; + +arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer +SubstraitSinkConsumer::MakeProducer( + AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure) { + arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen( + std::move(backpressure)); + auto out = push_gen.producer(); + *out_gen = std::move(push_gen); + return out; +} + +/// \brief Retrieve a RecordBatchReader from a Substrait plan in JSON. Review Comment: Docstrings for this method (and the next two) should go in the header file and not the cc file. ########## cpp/src/arrow/engine/substrait/serde_test.cc: ########## @@ -724,5 +729,52 @@ TEST(Substrait, ExtensionSetFromPlan) { EXPECT_EQ(decoded_add_func.name, "add"); } +Result<std::string> GetSubstraitJSON() { + ARROW_ASSIGN_OR_RAISE(std::string dir_string, + arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); + auto file_name = + arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet"); + auto file_path = file_name->ToString(); + + std::string substrait_json = R"({ + "relations": [ + {"rel": { + "read": { + "base_schema": { + "struct": { + "types": [ + {"binary": {}} + ] + }, + "names": [ + "foo" + ] + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER", + "format": "FILE_FORMAT_PARQUET" + } + ] + } + } + }} + ] + })"; + + std::string filename_placeholder = "FILENAME_PLACEHOLDER"; + substrait_json.replace(substrait_json.find(filename_placeholder), + filename_placeholder.size(), file_path); + return substrait_json; +} + +TEST(Substrait, GetRecordBatchReader) { + ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON()); + ASSERT_OK_AND_ASSIGN(auto reader, engine::GetRecordBatchReader(substrait_json)); + ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get())); + EXPECT_GT(table->num_rows(), 0); +} + Review Comment: Can we add a test case with an invalid plan? ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace cp = arrow::compute; + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(cp::ExecBatch batch) override; + + Status Init(const std::shared_ptr<Schema>& schema) override; + + static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override; + + private: + PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_; +}; + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json); + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::shared_ptr<Buffer> substrait_buffer); + +Result<std::shared_ptr<Buffer>> GetSubstraitBufferFromJSON(std::string& substrait_json); Review Comment: Technically this method probably isn't very useful to users but I don't see too much harm in leaving it in. ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace cp = arrow::compute; + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer { + public: + explicit SubstraitSinkConsumer( + AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator, + arrow::util::BackpressureOptions backpressure = {}) + : producer_(MakeProducer(generator, std::move(backpressure))) {} + + Status Consume(cp::ExecBatch batch) override; + + Status Init(const std::shared_ptr<Schema>& schema) override; + + static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer + MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen, + arrow::util::BackpressureOptions backpressure); + + Future<> Finish() override; + + private: + PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_; +}; + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::string& substrait_json); + +Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( + std::shared_ptr<Buffer> substrait_buffer); + +Result<std::shared_ptr<Buffer>> GetSubstraitBufferFromJSON(std::string& substrait_json); Review Comment: These methods will need `ARROW_ENGINE_EXPORT` ########## cpp/src/arrow/engine/substrait/util.h: ########## @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <string> +#include <vector> +#include "arrow/compute/type_fwd.h" +#include "arrow/engine/api.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/iterator.h" +#include "arrow/util/optional.h" + +namespace arrow { + +namespace cp = arrow::compute; + +namespace engine { + +/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator +class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer { Review Comment: Sorry for not realizing this earlier but this entire class can be moved into the `.cc` file. Then we can get rid of the `#include "arrow/util/async_generator.h"` which we've been trying to keep out of public header files. ########## python/pyarrow/tests/test_substrait.py: ########## @@ -0,0 +1,103 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import pathlib +from pyarrow.lib import tobytes +import pyarrow.parquet as pq + +try: + from pyarrow import engine + from pyarrow.engine import ( + run_query, + get_buffer_from_json, + ) +except ImportError: + engine = None + +# Marks all of the tests in this module +# Ignore these with pytest ... -m 'not engine' +pytestmark = pytest.mark.engine + + Review Comment: Let's add a test case with an invalid plan here too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
