westonpace commented on code in PR #14207: URL: https://github.com/apache/arrow/pull/14207#discussion_r979028154
########## cpp/src/arrow/compute/exec/plan_test.cc: ########## @@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) { Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); } +template <typename ElementType, typename OptionsType> +void test_source_sink_error( + std::string source_factory_name, + std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)> + to_elements) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr<Schema> no_schema; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator<ElementType>(elements); + }; + + auto null_executor_options = OptionsType{exp_batches.schema, element_it_maker}; + ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); + + auto null_schema_options = OptionsType{no_schema, element_it_maker}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); +} + +template <typename ElementType, typename OptionsType> +void test_source_sink( Review Comment: ```suggestion void TestSourceSink( ``` ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t max_batch_size; }; +/// \brief An extended Source node which accepts a schema +/// +/// ItMaker is a maker of an iterator of tabular data. +template <typename ItMaker> +class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { + public: + SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker) + : schema(schema), it_maker(std::move(it_maker)) {} + + // the schema of the record batches from the iterator Review Comment: ```suggestion /// the schema of the record batches from the iterator ``` ########## cpp/src/arrow/compute/exec/plan_test.cc: ########## @@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) { Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); } +template <typename ElementType, typename OptionsType> +void test_source_sink_error( Review Comment: ```suggestion void TestSourceSinkError( ``` ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t max_batch_size; }; +/// \brief An extended Source node which accepts a schema +/// +/// ItMaker is a maker of an iterator of tabular data. +template <typename ItMaker> +class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { + public: + SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker) + : schema(schema), it_maker(std::move(it_maker)) {} + + // the schema of the record batches from the iterator + std::shared_ptr<Schema> schema; + + // maker of an iterator which acts as the data source Review Comment: ```suggestion /// maker of an iterator which acts as the data source ``` ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { int64_t max_batch_size; }; +/// \brief An extended Source node which accepts a schema +/// +/// ItMaker is a maker of an iterator of tabular data. +template <typename ItMaker> +class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { Review Comment: I still think it is valuable to allow the user to specify the I/O executor. I just think it should default to the default I/O executor. So something like... /// The executor to use to scan the iterator, defaults to the I/O executor Executor* io_executor = NULLPTR; ########## cpp/src/arrow/compute/exec/plan_test.cc: ########## @@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) { Raises(StatusCode::Invalid, HasSubstr("batch_size > 0"))); } +template <typename ElementType, typename OptionsType> +void test_source_sink_error( + std::string source_factory_name, + std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)> + to_elements) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr<Schema> no_schema; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator<ElementType>(elements); + }; + + auto null_executor_options = OptionsType{exp_batches.schema, element_it_maker}; + ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); + + auto null_schema_options = OptionsType{no_schema, element_it_maker}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); +} + +template <typename ElementType, typename OptionsType> +void test_source_sink( + std::string source_factory_name, + std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)> + to_elements) { + ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1)); + ExecContext exec_context(default_memory_pool(), io_executor.get()); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context)); + AsyncGenerator<std::optional<ExecBatch>> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches)); + auto element_it_maker = [&elements]() { + return MakeVectorIterator<ElementType>(elements); + }; + + ASSERT_OK(Declaration::Sequence({ + {source_factory_name, + OptionsType{exp_batches.schema, element_it_maker}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); +} + +TEST(ExecPlanExecution, ArrayVectorSourceSink) { + test_source_sink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>( + "array_vector_source", ToArrayVectors); +} + +TEST(ExecPlanExecution, ArrayVectorSourceSinkError) { + test_source_sink_error<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>( + "array_vector_source", ToArrayVectors); +} + +TEST(ExecPlanExecution, ExecBatchSourceSink) { + test_source_sink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>( + "exec_batch_source", ToExecBatches); +} + +TEST(ExecPlanExecution, ExecBatchSourceSinkError) { + test_source_sink_error<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>( + "exec_batch_source", ToExecBatches); +} + +TEST(ExecPlanExecution, RecordBatchSourceSink) { + test_source_sink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>( + "record_batch_source", ToRecordBatches); +} + +TEST(ExecPlanExecution, RecordBatchSourceSinkError) { + test_source_sink_error<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>( + "record_batch_source", ToRecordBatches); +} + Review Comment: ```suggestion TEST(ExecPlanExecution, ArrayVectorSourceSink) { TestSourceSink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); } TEST(ExecPlanExecution, ArrayVectorSourceSinkError) { TestSourceSinkError<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); } TEST(ExecPlanExecution, ExecBatchSourceSink) { TestSourceSink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>( "exec_batch_source", ToExecBatches); } TEST(ExecPlanExecution, ExecBatchSourceSinkError) { TestSourceSinkError<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>( "exec_batch_source", ToExecBatches); } TEST(ExecPlanExecution, RecordBatchSourceSink) { TestSourceSink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>( "record_batch_source", ToRecordBatches); } TEST(ExecPlanExecution, RecordBatchSourceSinkError) { TestSourceSinkError<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>( "record_batch_source", ToRecordBatches); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org