I tried the method proposed by Aldrin, but when my offset exceeds a batch
length, my ReadNext() will fetch a batch with row=0. That is, after I set the
filter, my call to ReadNext will not fetch the batch directly at the beginning
of the filter. I may need to call batch n times in a row before I get a batch
with data, which is too strange for me. Is there a development plan about this?
------------------ 原始邮件 ------------------
发件人:
"user"
<[email protected]>;
发送时间: 2022年5月18日(星期三) 晚上11:09
收件人: "user"<[email protected]>;
主题: Re: Can arrow::dataset::Scanner¶ skip a certain number of rows?
We do not have the option to do this today. However, it is something
we could do a better job of as long as we aren't reading CSV.
Aldrin's workaround is pretty solid, especially if you are reading
parquet and have a row_index column. Parquet statistics filtering
should ensure we are only reading the needed row groups.
We will need to implement something similar for [1] and it seems we
should have a general JIRA for "paging support (start_index & count)"
for datasets but I couldn't find one with a quick search.
[1] https://issues.apache.org/jira/browse/ARROW-15589
On Tue, May 17, 2022 at 10:09 AM Aldrin <[email protected]> wrote:
>
> I think batches are all or nothing as far as reading/deserializing.
However, you can manage a slice of that batch instead of the whole batch in the
<deal the batch> portion. That is, if you have 2 batches with 10 rows each,
and you want to skip rows [10, 15) (0-indexed, inclusive of 10, exclusive of
15), then you can track the first batch in a vector (or handle directly), then
in the 2nd batch you can use `Slice(5)` [1] to track rows [15, 20).
>
> Some other approaches might include using the `Take` compute function on a
"super" table or on the particular batch [2], or putting a "row index" column
in your data and using that as a filter, e.g.:
>
> ```
> #include <arrow/api.h>
> #include <arrow/dataset/api.h>
> #include <arrow/compute/api.h>
>
> // for arrow expressions
> using arrow::compute::greater_equal;
> using arrow::compute::and_;
> using arrow::compute::literal;
> using arrow::compute::field_ref;
>
> // exclude rows [10, 15) (include 10, exclude 15, 0-indexed)
> Expression filter_rowstokeep = and_({
>
less
(field_ref(FieldRef("row_index")), literal(10))
> ,greater_equal(field_ref(FieldRef("row_index")),
literal(15))
> })
>
> // construct scanner builder as usual
> ...
> scanner_builder->project()
>
> // bind filter to scanner builder
> scanner_builder->Filter(filter_rowstokeep)
>
> // finish and execute as usual
> scanner = scanner_builder->finish()
> ...
> ```
>
> The above code sample is adapted and simplified from what I do in [3],
which you can refer to if you'd like.
>
> Finally, you can also construct a new table with the row_index column and
then filter that instead, which I think could be fairly efficient, but I
haven't played with the API enough to know the most efficient way. I also
suspect it might be slightly annoying with the existing interface. Either:
> (a) dataset -> table -> table with extra column -> dataset ->
scanner builder with filter as above -> scanner -> fiinish
> (b) table -> table with extra column -> dataset -> scanner
builder with filter as above -> scanner -> finish
>
> The difference between (a) and (b) above being how you initially read the
data from S3 into memory (either as a dataset, leveraging the dataset
framework, or as tables, probably managing the reads a bit more manually).
>
>
> <---- references -->
>
> [1]:
https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4NK5arrow11RecordBatch5SliceE7int64_t
> [2]: https://arrow.apache.org/docs/cpp/compute.html#selections
> [3]:
https://github.com/drin/cookbooks/blob/mainline/arrow/projection/project_from_dataset.cpp#L137
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Tue, May 17, 2022 at 9:33 AM 1057445597 <[email protected]> wrote:
>>
>> Can arrow skip a certain number of lines when reading data? I want to
do distributed training, read data through arrow, my code is as follows
>>
>> dataset = getDatasetFromS3()
>> scanner_builder = dataset->NewScan()
>> scanner_builder->project()
>> scanner = scanner_builder->finish()
>> batch_reader = scanner->ToBatchReader()
>> current_batch_ = batch_reader->ReadNext()
>> deal the batch 。。。
>>
>> Can I skip a certain number of lines before calling ReadNext()? Or is
there a skip() interface or an offset() interface?
>>
>>