but it's better to support(start_index, max_rows), so that I don't have to save
row_index column
------------------ 原始邮件 ------------------
发件人:
"user"
<[email protected]>;
发送时间: 2022年5月19日(星期四) 凌晨0:58
收件人: "user"<[email protected]>;
主题: 回复: Can arrow::dataset::Scanner¶ skip a certain number of rows?
It works for me. Because I use multi-threaded reading. When the filter is not
set, it is ok to read batches sequentially. After setting the filter, the
previous batch may read less or no data. Then the next batch I judged whether
it was empty before I finished reading, which led to the appearance of my
coredump. After I removed the multi-threading, it was OK. Thank you for your
enthusiastic guidance.
------------------ 原始邮件 ------------------
发件人:
"user"
<[email protected]>;
发送时间: 2022年5月19日(星期四) 凌晨0:17
收件人: "user"<[email protected]>;
主题: 回复: Can arrow::dataset::Scanner¶ skip a certain number of rows?
Also, when I added filter, my program had an unexpected coredump, and I'm now
looking at why. I did it based on tfio's code
------------------ 原始邮件 ------------------
发件人:
"user"
<[email protected]>;
发送时间: 2022年5月19日(星期四) 凌晨0:09
收件人: "user"<[email protected]>;
主题: 回复: Can arrow::dataset::Scanner¶ skip a certain number of rows?
I tried the method proposed by Aldrin, but when my offset exceeds a batch
length, my ReadNext() will fetch a batch with row=0. That is, after I set the
filter, my call to ReadNext will not fetch the batch directly at the beginning
of the filter. I may need to call batch n times in a row before I get a batch
with data, which is too strange for me. Is there a development plan about this?
------------------ 原始邮件 ------------------
发件人:
"user"
<[email protected]>;
发送时间: 2022年5月18日(星期三) 晚上11:09
收件人: "user"<[email protected]>;
主题: Re: Can arrow::dataset::Scanner¶ skip a certain number of rows?
We do not have the option to do this today. However, it is something
we could do a better job of as long as we aren't reading CSV.
Aldrin's workaround is pretty solid, especially if you are reading
parquet and have a row_index column. Parquet statistics filtering
should ensure we are only reading the needed row groups.
We will need to implement something similar for [1] and it seems we
should have a general JIRA for "paging support (start_index & count)"
for datasets but I couldn't find one with a quick search.
[1] https://issues.apache.org/jira/browse/ARROW-15589
On Tue, May 17, 2022 at 10:09 AM Aldrin <[email protected]> wrote:
>
> I think batches are all or nothing as far as reading/deserializing.
However, you can manage a slice of that batch instead of the whole batch in the
<deal the batch> portion. That is, if you have 2 batches with 10 rows each,
and you want to skip rows [10, 15) (0-indexed, inclusive of 10, exclusive of
15), then you can track the first batch in a vector (or handle directly), then
in the 2nd batch you can use `Slice(5)` [1] to track rows [15, 20).
>
> Some other approaches might include using the `Take` compute function on a
"super" table or on the particular batch [2], or putting a "row index" column
in your data and using that as a filter, e.g.:
>
> ```
> #include <arrow/api.h>
> #include <arrow/dataset/api.h>
> #include <arrow/compute/api.h>
>
> // for arrow expressions
> using arrow::compute::greater_equal;
> using arrow::compute::and_;
> using arrow::compute::literal;
> using arrow::compute::field_ref;
>
> // exclude rows [10, 15) (include 10, exclude 15, 0-indexed)
> Expression filter_rowstokeep = and_({
> less
(field_ref(FieldRef("row_index")), literal(10))
> ,greater_equal(field_ref(FieldRef("row_index")), literal(15))
> })
>
> // construct scanner builder as usual
> ...
> scanner_builder->project()
>
> // bind filter to scanner builder
> scanner_builder->Filter(filter_rowstokeep)
>
> // finish and execute as usual
> scanner = scanner_builder->finish()
> ...
> ```
>
> The above code sample is adapted and simplified from what I do in [3],
which you can refer to if you'd like.
>
> Finally, you can also construct a new table with the row_index column and
then filter that instead, which I think could be fairly efficient, but I
haven't played with the API enough to know the most efficient way. I also
suspect it might be slightly annoying with the existing interface. Either:
> (a) dataset -> table -> table with extra column -> dataset ->
scanner builder with filter as above -> scanner -> fiinish
> (b) table -> table with extra column -> dataset -> scanner
builder with filter as above -> scanner -> finish
>
> The difference between (a) and (b) above being how you initially read the
data from S3 into memory (either as a dataset, leveraging the dataset
framework, or as tables, probably managing the reads a bit more manually).
>
>
> <---- references -->
>
> [1]:
https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4NK5arrow11RecordBatch5SliceE7int64_t
> [2]: https://arrow.apache.org/docs/cpp/compute.html#selections
> [3]:
https://github.com/drin/cookbooks/blob/mainline/arrow/projection/project_from_dataset.cpp#L137
>
> Aldrin Montana
> Computer Science PhD Student
> UC Santa Cruz
>
>
> On Tue, May 17, 2022 at 9:33 AM 1057445597 <[email protected]> wrote:
>>
>> Can arrow skip a certain number of lines when reading data? I want to
do distributed training, read data through arrow, my code is as follows
>>
>> dataset = getDatasetFromS3()
>> scanner_builder = dataset->NewScan()
>> scanner_builder->project()
>> scanner = scanner_builder->finish()
>> batch_reader = scanner->ToBatchReader()
>> current_batch_ = batch_reader->ReadNext()
>> deal the batch 。。。
>>
>> Can I skip a certain number of lines before calling ReadNext()? Or is
there a skip() interface or an offset() interface?
>>
>>