> Replacing ... with ... works as expected This is, I think, because the RecordBatchSourceNode defaults to implicit ordering (note the RecordBatchSourceNode is a SchemaSourceNode):
``` struct SchemaSourceNode : public SourceNode { SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema, arrow::AsyncGenerator<std::optional<ExecBatch>> generator) : SourceNode(plan, schema, generator, Ordering::Implicit()) {} ``` It seems a little inconsistent that the RecordBatchSourceNode defaults to implicit and the RecordBatchReader source does not. It would be nice to fix those (as I described above it is probably ok to assume an implicit ordering in many cases). On Wed, Jul 26, 2023 at 8:18 AM Weston Pace <weston.p...@gmail.com> wrote: > > I think the key problem is that the input stream is unordered. The > > input stream is a ArrowArrayStream imported from python side, and then > > declared to a "record_batch_reader_source", which is a unordered > > source node. So the behavior is expected. > > I think the RecordBatchReaderSourceOptions should add an ordering > > parameter to indicate the input stream ordering. Otherwise, I need to > > convert the record_batch_reader_source into a "record_batch_source" > > with a record_batch generator. > > I agree. Also, keep in mind that there is the "implicit order" which > means "this source node has some kind of deterministic ordering even if it > isn't reflected in any single column". In other words, if you have a CSV > file for example it will always be implicitly ordered (by line number) even > if the line number isn't a column. This should allow us to do things like > "grab the first 10 rows" and get behavior the user expects even if their > data isn't explicitly ordered. In most cases we can assume that data has > some kind of batch order. The only time it does not is if the source > itself is non-deterministic. For example, maybe the source is some kind of > unordered scan from an external SQL source. > > > Also, I'd like to have a discuss on dataset scanner, is it produce a > > stable sequence of record batches (as an implicit ordering) when the > > underlying storage is not changed? > > Yes, both the old and new scan node are capable of doing this. The > implicit order is given by the order of the fragments in the dataset (which > we assume will always be consistent and, in the case of a > FileSystemDataset, it is). In the old scan node you need to set the > property `require_sequenced_output` on the ScanNodeOptions to true (I > believe the new scan node will always sequence output but this property may > eventually exist there too). > > > For my situation, the downstream > > executor may crush, then it would request to continue from a > > intermediate state (with a restart offset). I'd like to make it into a > > fetch node to skip heading rows, but it seems not an optimized way. > > Regrettably the old scan node does not have skip implemented. It is a > little tricky since we do not have a catalog and thus do not know how many > rows every single file has. So we have to calculate the skip at runtime. > I am planning to support this in the new scan node. > > > Maybe I should inspect fragments in the dataset, to skip reading > > unnecessary files, and build a FlieSystemDataset on the fly? > > Yes, this should work today. > > > On Tue, Jul 25, 2023 at 10:37 PM Wenbo Hu <huwenbo1...@gmail.com> wrote: > >> Replacing >> ``` >> ac::Declaration source{"record_batch_reader_source", >> ac::RecordBatchReaderSourceNodeOptions{std::move(input)}}; >> ``` >> with >> ``` >> ac::RecordBatchSourceNodeOptions rb_source_options{ >> input->schema(), [input]() { return >> arrow::MakeFunctionIterator([input] { return input->Next(); }); }}; >> ac::Declaration source{"record_batch_source", >> std::move(rb_source_options)}; >> ``` >> Works as expected. >> >> Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月26日周三 10:22写道: >> > >> > Hi, >> > I'll open a issue on the DeclareToReader problem. >> > I think the key problem is that the input stream is unordered. The >> > input stream is a ArrowArrayStream imported from python side, and then >> > declared to a "record_batch_reader_source", which is a unordered >> > source node. So the behavior is expected. >> > I think the RecordBatchReaderSourceOptions should add an ordering >> > parameter to indicate the input stream ordering. Otherwise, I need to >> > convert the record_batch_reader_source into a "record_batch_source" >> > with a record_batch generator. >> > Also, I'd like to have a discuss on dataset scanner, is it produce a >> > stable sequence of record batches (as an implicit ordering) when the >> > underlying storage is not changed? For my situation, the downstream >> > executor may crush, then it would request to continue from a >> > intermediate state (with a restart offset). I'd like to make it into a >> > fetch node to skip heading rows, but it seems not an optimized way. >> > Maybe I should inspect fragments in the dataset, to skip reading >> > unnecessary files, and build a FlieSystemDataset on the fly? >> > >> > Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道: >> > > >> > > > Reading the source code of exec_plan.cc, DeclarationToReader called >> > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output >> > > > parameter in SinkNodeOptions, also, it calls validate which should >> > > > fail if the SinkNodeOptions honors the sequence_output. Then it >> seems >> > > > that DeclarationToReader cannot follow the input batch order? >> > > >> > > These methods should not be ignoring sequence_output. Do you want to >> open >> > > a bug? This should be a straightforward one to fix. >> > > >> > > > Then how the substrait works in this scenario? Does it output >> > > > disorderly as well? >> > > >> > > Probably. Much of internal Substrait testing is probably using >> > > DeclarationToTable or DeclarationToBatches. The ordered execution >> hasn't >> > > been adopted widely yet because the old scanner doesn't set the batch >> index >> > > and the new scanner isn't ready yet. This limits the usefulness to >> data >> > > that is already in memory (the in-memory sources do set the batch >> index). >> > > >> > > I think your understanding of the concept is correct however. Can you >> > > share a sample plan that is not working for you? If you use >> > > DeclarationToTable do you get consistently ordered results? >> > > >> > > On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com> >> wrote: >> > > >> > > > Reading the source code of exec_plan.cc, DeclarationToReader called >> > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output >> > > > parameter in SinkNodeOptions, also, it calls validate which should >> > > > fail if the SinkNodeOptions honors the sequence_output. Then it >> seems >> > > > that DeclarationToReader cannot follow the input batch order? >> > > > Then how the substrait works in this scenario? Does it output >> > > > disorderly as well? >> > > > >> > > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道: >> > > > > >> > > > > Hi, >> > > > > I'm trying to zip two streams with same order but different >> > > > processes. >> > > > > For example, the original stream comes with two column 'id' >> and >> > > > > 'age', and splits into two stream processed distributedly using >> acero: >> > > > > 1. hash the 'id' into a stream with single column 'bucket_id' and >> 2. >> > > > > classify 'age' into ['child', 'teenage', 'adult',...]. And then >> zip >> > > > > into a single stream. >> > > > > >> > > > > [ 'id' | 'age' | many other columns] >> > > > > | | | >> > > > > ['bucket_id'] ['classify'] | >> > > > > | | | >> > > > > [zipped_stream | many_other_columns] >> > > > > I was expecting both bucket_id and classify can keep the same >> order as >> > > > > the orginal stream before they are zipped. >> > > > > According to document, "ordered execution" is using batch_index to >> > > > > indicate the order of batches. >> > > > > but acero::DeclarationToReader with a QueryOptions that >> sequce_output >> > > > > is set to true does not mean that it keeps the order if the input >> > > > > stream is not ordered. But it doesn't fail during the execution >> > > > > (bucket_id and classify are not specify any ordering). Then How >> can I >> > > > > make the acero produce a stream that keep the order as the >> original >> > > > > input? >> > > > > -- >> > > > > --------------------- >> > > > > Best Regards, >> > > > > Wenbo Hu, >> > > > >> > > > >> > > > >> > > > -- >> > > > --------------------- >> > > > Best Regards, >> > > > Wenbo Hu, >> > > > >> > >> > >> > >> > -- >> > --------------------- >> > Best Regards, >> > Wenbo Hu, >> >> >> >> -- >> --------------------- >> Best Regards, >> Wenbo Hu, >> >