rtpsw commented on code in PR #14041:
URL: https://github.com/apache/arrow/pull/14041#discussion_r967704194
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
}
};
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+ SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+ const auto& cast_options = checked_cast<const Options&>(options);
+ auto& it_maker = cast_options.it_maker;
+ auto& schema = cast_options.schema;
+
+ auto io_executor = plan->exec_context()->executor();
+ auto it = it_maker();
+
+ RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema,
This::kKindName));
+ ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor,
schema));
+ return plan->EmplaceNode<This>(plan, schema, generator);
+ }
+
+ static arrow::Status ValidateSchemaSourceNodeInput(
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema,
+ const char* kKindName) {
+ if (schema == NULLPTR) {
+ return Status::Invalid(kKindName, " requires schema which is not null");
+ }
+ if (io_executor == NULLPTR) {
+ return Status::Invalid(kKindName, " requires IO-executor which is not
null");
+ }
+
+ return Status::OK();
+ }
+
+ template <typename Item>
+ static Iterator<Enumerated<Item>> MakeEnumeratedIterator(Iterator<Item> it) {
+ // TODO: Should Enumerated<>.index be changed to int64_t? Currently, this
change
+ // causes dataset unit-test failures
+ using index_t = decltype(Enumerated<Item>{}.index);
+ struct {
+ index_t index = 0;
+ Enumerated<Item> operator()(const Item& item) {
+ return Enumerated<Item>{item, index++, false};
+ }
+ } enumerator;
+ return MakeMapIterator(std::move(enumerator), std::move(it));
+ }
+
+ template <typename Item>
+ static arrow::AsyncGenerator<Item> MakeUnenumeratedGenerator(
+ const arrow::AsyncGenerator<Enumerated<Item>>& enum_gen) {
+ using Enum = Enumerated<Item>;
+ return MakeMappedGenerator(enum_gen, [](const Enum& e) { return e.value;
});
+ }
+
+ template <typename Item>
+ static arrow::AsyncGenerator<Item> MakeOrderedGenerator(
+ const arrow::AsyncGenerator<Enumerated<Item>>& unordered_gen) {
+ using Enum = Enumerated<Item>;
+ auto enum_gen = MakeSequencingGenerator(
+ unordered_gen,
+ /*compare=*/[](const Enum& a, const Enum& b) { return a.index >
b.index; },
+ /*is_next=*/[](const Enum& a, const Enum& b) { return a.index + 1 ==
b.index; },
+ /*initial_value=*/Enum{{}, 0});
+ return MakeUnenumeratedGenerator(enum_gen);
+ }
+};
+
+struct RecordBatchSourceNode
+ : public SchemaSourceNode<RecordBatchSourceNode,
RecordBatchSourceNodeOptions> {
+ using RecordBatchSchemaSourceNode =
+ SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+ using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ static Result<arrow::AsyncGenerator<util::optional<ExecBatch>>>
MakeGenerator(
+ Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema) {
+ auto to_exec_batch =
+ [schema](const std::shared_ptr<RecordBatch>& batch) ->
util::optional<ExecBatch> {
+ if (batch == NULLPTR || *batch->schema() != *schema) {
+ return util::nullopt;
+ }
+ return util::optional<ExecBatch>(ExecBatch(*batch));
+ };
+ auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+ auto enum_it = MakeEnumeratedIterator(std::move(exec_batch_it));
+ ARROW_ASSIGN_OR_RAISE(auto enum_gen,
+ MakeBackgroundGenerator(std::move(enum_it),
io_executor));
+ return MakeUnenumeratedGenerator(std::move(enum_gen));
Review Comment:
It turns out my solution doesn't solve the problem - as before, I still see
infrequent cases of out-of-order delivery when the IO-context has multiple
threads. Even after examining a fair amount of Arrow code related to
generators, and trying a couple of things, I still have no good idea how to fix
this. OTOH, it looks like I'm not the only one, because there is an [existing
multi-threaded unit test involving
`SourceNode`](https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/compute/exec/plan_test.cc#L1429-L1459)
that [checks result batches match while ignoring their
order](https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/compute/exec/plan_test.cc#L1457-L1458).
This suggests the possibility that the out-of-order-batches problem originates
in `SourceNode`. I won't try to fix this problem in this PR and will instead
resort to ignoring order in the unit tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]