> Replacing ... with ... works as expected

This is, I think, because the RecordBatchSourceNode defaults to implicit
ordering (note the RecordBatchSourceNode is a SchemaSourceNode):

```
struct SchemaSourceNode : public SourceNode {
  SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
                   arrow::AsyncGenerator<std::optional<ExecBatch>>
generator)
      : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
```

It seems a little inconsistent that the RecordBatchSourceNode defaults to
implicit and the RecordBatchReader source does not.  It would be nice to
fix those (as I described above it is probably ok to assume an implicit
ordering in many cases).

On Wed, Jul 26, 2023 at 8:18 AM Weston Pace <weston.p...@gmail.com> wrote:

> > I think the key problem is that the input stream is unordered. The
> > input stream is a ArrowArrayStream imported from python side, and then
> > declared to a "record_batch_reader_source", which is a unordered
> > source node. So the behavior is expected.
> >   I think the RecordBatchReaderSourceOptions should add an ordering
> > parameter to indicate the input stream ordering. Otherwise, I need to
> > convert the record_batch_reader_source into a "record_batch_source"
> > with a record_batch generator.
>
> I agree.  Also, keep in mind that there is the "implicit order" which
> means "this source node has some kind of deterministic ordering even if it
> isn't reflected in any single column".  In other words, if you have a CSV
> file for example it will always be implicitly ordered (by line number) even
> if the line number isn't a column.  This should allow us to do things like
> "grab the first 10 rows" and get behavior the user expects even if their
> data isn't explicitly ordered.  In most cases we can assume that data has
> some kind of batch order.  The only time it does not is if the source
> itself is non-deterministic.  For example, maybe the source is some kind of
> unordered scan from an external SQL source.
>
> > Also, I'd like to have a discuss on dataset scanner, is it produce a
> > stable sequence of record batches (as an implicit ordering) when the
> > underlying storage is not changed?
>
> Yes, both the old and new scan node are capable of doing this.  The
> implicit order is given by the order of the fragments in the dataset (which
> we assume will always be consistent and, in the case of a
> FileSystemDataset, it is).  In the old scan node you need to set the
> property `require_sequenced_output` on the ScanNodeOptions to true (I
> believe the new scan node will always sequence output but this property may
> eventually exist there too).
>
> > For my situation, the downstream
> > executor may crush, then it would request to continue from a
> > intermediate state (with a restart offset). I'd like to make it into a
> > fetch node to skip heading rows, but it seems not an optimized way.
>
> Regrettably the old scan node does not have skip implemented.  It is a
> little tricky since we do not have a catalog and thus do not know how many
> rows every single file has.  So we have to calculate the skip at runtime.
> I am planning to support this in the new scan node.
>
> > Maybe I should inspect fragments in the dataset, to skip reading
> > unnecessary files, and build a FlieSystemDataset on the fly?
>
> Yes, this should work today.
>
>
> On Tue, Jul 25, 2023 at 10:37 PM Wenbo Hu <huwenbo1...@gmail.com> wrote:
>
>> Replacing
>> ```
>> ac::Declaration source{"record_batch_reader_source",
>> ac::RecordBatchReaderSourceNodeOptions{std::move(input)}};
>> ```
>> with
>> ```
>> ac::RecordBatchSourceNodeOptions rb_source_options{
>> input->schema(), [input]() { return
>> arrow::MakeFunctionIterator([input] { return input->Next(); }); }};
>> ac::Declaration source{"record_batch_source",
>> std::move(rb_source_options)};
>> ```
>> Works as expected.
>>
>> Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月26日周三 10:22写道:
>> >
>> > Hi,
>> >   I'll open a issue on the DeclareToReader problem.
>> >   I think the key problem is that the input stream is unordered. The
>> > input stream is a ArrowArrayStream imported from python side, and then
>> > declared to a "record_batch_reader_source", which is a unordered
>> > source node. So the behavior is expected.
>> >   I think the RecordBatchReaderSourceOptions should add an ordering
>> > parameter to indicate the input stream ordering. Otherwise, I need to
>> > convert the record_batch_reader_source into a "record_batch_source"
>> > with a record_batch generator.
>> >   Also, I'd like to have a discuss on dataset scanner, is it produce a
>> > stable sequence of record batches (as an implicit ordering) when the
>> > underlying storage is not changed? For my situation, the downstream
>> > executor may crush, then it would request to continue from a
>> > intermediate state (with a restart offset). I'd like to make it into a
>> > fetch node to skip heading rows, but it seems not an optimized way.
>> > Maybe I should inspect fragments in the dataset, to skip reading
>> > unnecessary files, and build a FlieSystemDataset on the fly?
>> >
>> > Weston Pace <weston.p...@gmail.com> 于2023年7月25日周二 23:44写道:
>> > >
>> > > > Reading the source code of exec_plan.cc, DeclarationToReader called
>> > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output
>> > > > parameter in SinkNodeOptions, also, it calls validate which should
>> > > > fail if the SinkNodeOptions honors the sequence_output. Then it
>> seems
>> > > > that DeclarationToReader cannot follow the input batch order?
>> > >
>> > > These methods should not be ignoring sequence_output.  Do you want to
>> open
>> > > a bug?  This should be a straightforward one to fix.
>> > >
>> > > > Then how the substrait works in this scenario? Does it output
>> > > > disorderly as well?
>> > >
>> > > Probably.  Much of internal Substrait testing is probably using
>> > > DeclarationToTable or DeclarationToBatches.  The ordered execution
>> hasn't
>> > > been adopted widely yet because the old scanner doesn't set the batch
>> index
>> > > and the new scanner isn't ready yet.  This limits the usefulness to
>> data
>> > > that is already in memory (the in-memory sources do set the batch
>> index).
>> > >
>> > > I think your understanding of the concept is correct however.  Can you
>> > > share a sample plan that is not working for you?  If you use
>> > > DeclarationToTable do you get consistently ordered results?
>> > >
>> > > On Tue, Jul 25, 2023 at 7:06 AM Wenbo Hu <huwenbo1...@gmail.com>
>> wrote:
>> > >
>> > > > Reading the source code of exec_plan.cc, DeclarationToReader called
>> > > > DeclarationToRecordBatchGenerator, which ignores the sequence_output
>> > > > parameter in SinkNodeOptions, also, it calls validate which should
>> > > > fail if the SinkNodeOptions honors the sequence_output. Then it
>> seems
>> > > > that DeclarationToReader cannot follow the input batch order?
>> > > > Then how the substrait works in this scenario? Does it output
>> > > > disorderly as well?
>> > > >
>> > > > Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月25日周二 19:12写道:
>> > > > >
>> > > > > Hi,
>> > > > >     I'm trying to zip two streams with same order but different
>> > > > processes.
>> > > > >     For example, the original stream comes with two column 'id'
>> and
>> > > > > 'age', and splits into two stream processed distributedly using
>> acero:
>> > > > > 1. hash the 'id' into a stream with single column 'bucket_id' and
>> 2.
>> > > > > classify 'age' into ['child', 'teenage', 'adult',...]. And then
>> zip
>> > > > > into a single stream.
>> > > > >
>> > > > >        [      'id'      |      'age'      | many other columns]
>> > > > >                 |                  |                        |
>> > > > >        ['bucket_id']   ['classify']                |
>> > > > >                  |                  |                       |
>> > > > >               [zipped_stream | many_other_columns]
>> > > > > I was expecting both bucket_id and classify can keep the same
>> order as
>> > > > > the orginal stream before they are zipped.
>> > > > > According to document, "ordered execution" is using batch_index to
>> > > > > indicate the order of batches.
>> > > > > but acero::DeclarationToReader with a QueryOptions that
>> sequce_output
>> > > > > is set to true does not mean that it keeps the order if the input
>> > > > > stream is not ordered. But it doesn't fail during the execution
>> > > > > (bucket_id and classify are not specify any ordering). Then How
>> can I
>> > > > > make the acero produce a stream that keep the order as the
>> original
>> > > > > input?
>> > > > > --
>> > > > > ---------------------
>> > > > > Best Regards,
>> > > > > Wenbo Hu,
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > ---------------------
>> > > > Best Regards,
>> > > > Wenbo Hu,
>> > > >
>> >
>> >
>> >
>> > --
>> > ---------------------
>> > Best Regards,
>> > Wenbo Hu,
>>
>>
>>
>> --
>> ---------------------
>> Best Regards,
>> Wenbo Hu,
>>
>

Reply via email to