westonpace commented on code in PR #14207:
URL: https://github.com/apache/arrow/pull/14207#discussion_r979028154


##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+template <typename ElementType, typename OptionsType>
+void test_source_sink_error(
+    std::string source_factory_name,
+    std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+        to_elements) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+  auto element_it_maker = [&elements]() {
+    return MakeVectorIterator<ElementType>(elements);
+  };
+
+  auto null_executor_options = OptionsType{exp_batches.schema, 
element_it_maker};
+  ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, 
null_executor_options));
+
+  auto null_schema_options = OptionsType{no_schema, element_it_maker};
+  ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, 
null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+template <typename ElementType, typename OptionsType>
+void test_source_sink(

Review Comment:
   ```suggestion
   void TestSourceSink(
   ```



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public 
ExecNodeOptions {
   int64_t max_batch_size;
 };
 
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
+ public:
+  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker)
+      : schema(schema), it_maker(std::move(it_maker)) {}
+
+  // the schema of the record batches from the iterator

Review Comment:
   ```suggestion
     /// the schema of the record batches from the iterator
   ```



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+template <typename ElementType, typename OptionsType>
+void test_source_sink_error(

Review Comment:
   ```suggestion
   void TestSourceSinkError(
   ```



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public 
ExecNodeOptions {
   int64_t max_batch_size;
 };
 
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
+ public:
+  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker)
+      : schema(schema), it_maker(std::move(it_maker)) {}
+
+  // the schema of the record batches from the iterator
+  std::shared_ptr<Schema> schema;
+
+  // maker of an iterator which acts as the data source

Review Comment:
   ```suggestion
     /// maker of an iterator which acts as the data source
   ```



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public 
ExecNodeOptions {
   int64_t max_batch_size;
 };
 
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {

Review Comment:
   I still think it is valuable to allow the user to specify the I/O executor.  
I just think it should default to the default I/O executor.  So something 
like...
   
   /// The executor to use to scan the iterator, defaults to the I/O executor
   Executor* io_executor = NULLPTR;



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+template <typename ElementType, typename OptionsType>
+void test_source_sink_error(
+    std::string source_factory_name,
+    std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+        to_elements) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+  auto element_it_maker = [&elements]() {
+    return MakeVectorIterator<ElementType>(elements);
+  };
+
+  auto null_executor_options = OptionsType{exp_batches.schema, 
element_it_maker};
+  ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, 
null_executor_options));
+
+  auto null_schema_options = OptionsType{no_schema, element_it_maker};
+  ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, 
null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+template <typename ElementType, typename OptionsType>
+void test_source_sink(
+    std::string source_factory_name,
+    std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+        to_elements) {
+  ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1));
+  ExecContext exec_context(default_memory_pool(), io_executor.get());
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+  auto element_it_maker = [&elements]() {
+    return MakeVectorIterator<ElementType>(elements);
+  };
+
+  ASSERT_OK(Declaration::Sequence({
+                                      {source_factory_name,
+                                       OptionsType{exp_batches.schema, 
element_it_maker}},
+                                      {"sink", SinkNodeOptions{&sink_gen}},
+                                  })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+  test_source_sink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
+      "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
+  test_source_sink_error<std::shared_ptr<ArrayVector>, 
ArrayVectorSourceNodeOptions>(
+      "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSink) {
+  test_source_sink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
+      "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
+  test_source_sink_error<std::shared_ptr<ExecBatch>, 
ExecBatchSourceNodeOptions>(
+      "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSink) {
+  test_source_sink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
+      "record_batch_source", ToRecordBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
+  test_source_sink_error<std::shared_ptr<RecordBatch>, 
RecordBatchSourceNodeOptions>(
+      "record_batch_source", ToRecordBatches);
+}
+

Review Comment:
   ```suggestion
   TEST(ExecPlanExecution, ArrayVectorSourceSink) {
     TestSourceSink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
         "array_vector_source", ToArrayVectors);
   }
   
   TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
     TestSourceSinkError<std::shared_ptr<ArrayVector>, 
ArrayVectorSourceNodeOptions>(
         "array_vector_source", ToArrayVectors);
   }
   
   TEST(ExecPlanExecution, ExecBatchSourceSink) {
     TestSourceSink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
         "exec_batch_source", ToExecBatches);
   }
   
   TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
     TestSourceSinkError<std::shared_ptr<ExecBatch>, 
ExecBatchSourceNodeOptions>(
         "exec_batch_source", ToExecBatches);
   }
   
   TEST(ExecPlanExecution, RecordBatchSourceSink) {
     TestSourceSink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
         "record_batch_source", ToRecordBatches);
   }
   
   TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
     TestSourceSinkError<std::shared_ptr<RecordBatch>, 
RecordBatchSourceNodeOptions>(
         "record_batch_source", ToRecordBatches);
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to