rtpsw commented on code in PR #14041:
URL: https://github.com/apache/arrow/pull/14041#discussion_r967704194


##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
   }
 };
 
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+  SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+                   arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+      : SourceNode(plan, schema, generator) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+    const auto& cast_options = checked_cast<const Options&>(options);
+    auto& it_maker = cast_options.it_maker;
+    auto& schema = cast_options.schema;
+
+    auto io_executor = plan->exec_context()->executor();
+    auto it = it_maker();
+
+    RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema, 
This::kKindName));
+    ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, 
schema));
+    return plan->EmplaceNode<This>(plan, schema, generator);
+  }
+
+  static arrow::Status ValidateSchemaSourceNodeInput(
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema,
+      const char* kKindName) {
+    if (schema == NULLPTR) {
+      return Status::Invalid(kKindName, " requires schema which is not null");
+    }
+    if (io_executor == NULLPTR) {
+      return Status::Invalid(kKindName, " requires IO-executor which is not 
null");
+    }
+
+    return Status::OK();
+  }
+
+  template <typename Item>
+  static Iterator<Enumerated<Item>> MakeEnumeratedIterator(Iterator<Item> it) {
+    // TODO: Should Enumerated<>.index be changed to int64_t? Currently, this 
change
+    // causes dataset unit-test failures
+    using index_t = decltype(Enumerated<Item>{}.index);
+    struct {
+      index_t index = 0;
+      Enumerated<Item> operator()(const Item& item) {
+        return Enumerated<Item>{item, index++, false};
+      }
+    } enumerator;
+    return MakeMapIterator(std::move(enumerator), std::move(it));
+  }
+
+  template <typename Item>
+  static arrow::AsyncGenerator<Item> MakeUnenumeratedGenerator(
+      const arrow::AsyncGenerator<Enumerated<Item>>& enum_gen) {
+    using Enum = Enumerated<Item>;
+    return MakeMappedGenerator(enum_gen, [](const Enum& e) { return e.value; 
});
+  }
+
+  template <typename Item>
+  static arrow::AsyncGenerator<Item> MakeOrderedGenerator(
+      const arrow::AsyncGenerator<Enumerated<Item>>& unordered_gen) {
+    using Enum = Enumerated<Item>;
+    auto enum_gen = MakeSequencingGenerator(
+        unordered_gen,
+        /*compare=*/[](const Enum& a, const Enum& b) { return a.index > 
b.index; },
+        /*is_next=*/[](const Enum& a, const Enum& b) { return a.index + 1 == 
b.index; },
+        /*initial_value=*/Enum{{}, 0});
+    return MakeUnenumeratedGenerator(enum_gen);
+  }
+};
+
+struct RecordBatchSourceNode
+    : public SchemaSourceNode<RecordBatchSourceNode, 
RecordBatchSourceNodeOptions> {
+  using RecordBatchSchemaSourceNode =
+      SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+  using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  static Result<arrow::AsyncGenerator<util::optional<ExecBatch>>> 
MakeGenerator(
+      Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema) {
+    auto to_exec_batch =
+        [schema](const std::shared_ptr<RecordBatch>& batch) -> 
util::optional<ExecBatch> {
+      if (batch == NULLPTR || *batch->schema() != *schema) {
+        return util::nullopt;
+      }
+      return util::optional<ExecBatch>(ExecBatch(*batch));
+    };
+    auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+    auto enum_it = MakeEnumeratedIterator(std::move(exec_batch_it));
+    ARROW_ASSIGN_OR_RAISE(auto enum_gen,
+                          MakeBackgroundGenerator(std::move(enum_it), 
io_executor));
+    return MakeUnenumeratedGenerator(std::move(enum_gen));

Review Comment:
   It turns out my solution doesn't solve the problem - as before, I still see 
infrequent cases of out-of-order delivery when the IO-context has multiple 
threads. Even after examining a fair amount of Arrow code related to 
generators, and trying a couple of things, I still have no good idea how to fix 
this. OTOH, it looks like I'm not the only one, because there is an existing 
[unit test involving 
`SourceNode`](https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/compute/exec/plan_test.cc#L1429-L1459)
 that checks result batches match while ignoring their order. This suggests the 
possibility that the out-of-order-batches problem originates in `SourceNode`. I 
won't try to fix this problem in this PR and will instead resort to ignoring 
order in the unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to