PPParticle opened a new issue, #45116:
URL: https://github.com/apache/arrow/issues/45116
### Describe the bug, including details regarding any error messages,
version, and platform.
I tried to use arrow::recoredbatchreader to read multiple rowgroups from a
parquet file in parallelism. I use GetRecordBatchReader to acquire
recordbatchreader. However, I noticed that when the number of task exceeded the
number of cores, the reading would stop at `RETURN_NOT_OK(ReadNext(&batch));`.
The recordbatchreader only works when the number of tasks is less than the
number of cores.
And here is my codes:
```
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/thread_pool.h>
#include <arrow/util/parallel.h>
#include <arrow/util/range.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/column_reader.h>
#include <parquet/column_scanner.h>
#include <parquet/exception.h>
#include <parquet/types.h>
#include <cstring>
#include <chrono>
#include <iostream>
template <class FUNCTION, typename T,
typename R = typename
arrow::internal::call_traits::return_type<FUNCTION>::ValueType>
arrow::Future<std::vector<R>> ParallelForAsync_test(
std::vector<T> inputs, FUNCTION&& func,
arrow::internal::Executor* executor =
arrow::internal::GetCpuThreadPool()) {
std::vector<arrow::Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i,
std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<arrow::Result<R>>& results) ->
arrow::Result<std::vector<R>> {
return arrow::internal::UnwrapOrRaise(results);
});
}
arrow::Result<std::shared_ptr<arrow::Array>>
ChunkedArrayToArray(std::shared_ptr<arrow::ChunkedArray> chunked_array) {
auto arrays = chunked_array->chunks();
std::shared_ptr<arrow::Array> result;
ARROW_ASSIGN_OR_RAISE(result, arrow::Concatenate(arrays));
return result;
}
arrow::Status read_whole_file(std::string file, int batch_size, int &size) {
::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(4 * 1024);
arrow_reader_props.set_use_threads(true);
parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(reader_builder.OpenFile(file, false,
reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());
auto p_reader = arrow_reader->parquet_reader();
int nrgs = p_reader->metadata()->num_row_groups();
int nrows = p_reader->metadata()->num_rows();
int ncolumns = p_reader->metadata()->num_columns();
auto cpu_executor = ::arrow::internal::GetCpuThreadPool();
int rg_batchsize = nrgs / batch_size;
std::vector<std::shared_ptr<arrow::RecordBatchReader>> vec_reader;
std::cout << "total has " << nrgs << " rowgroups" << std::endl;
for (int j = 0; j <= rg_batchsize; j++) {
std::shared_ptr<arrow::RecordBatchReader> rb_reader;
arrow_reader->GetRecordBatchReader(
arrow::internal::Iota(j * batch_size, std::min((j+1) * batch_size,
nrgs)), &rb_reader);
std::cout << "tast "<< j << " range [" << j*batch_size << "," <<
std::min((j+1) * batch_size, nrgs) << "]" << std::endl;
vec_reader.emplace_back(rb_reader);
}
size = vec_reader.size();
std::cout << size << std::endl;
auto thread_start = std::chrono::high_resolution_clock::now();
auto read_recordbatch = [ncolumns, thread_start](size_t i,
std::shared_ptr<::arrow::RecordBatchReader> reader)
-> ::arrow::Result<bool>{
auto io_start = std::chrono::high_resolution_clock::now();
auto result = reader->ToTable();
std::vector<std::shared_ptr<::arrow::Array>> vec_array;
if (result.ok()) {
auto table = *result;
for (int i = 0; i < ncolumns; i++) {
auto result = ChunkedArrayToArray(table->column(i));
if (result.ok()) {
auto array = *result;
vec_array.emplace_back(array);
}
}
}
auto io_end = std::chrono::high_resolution_clock::now();
reader->Close();
std::cout << "thread " << i << " " << vec_array[0]->length() << "
rows "
<< "start_overhead " << std::chrono::duration<double,
std::milli>(io_start-thread_start).count() << " ms "
<< "io_overhead " << std::chrono::duration<double,
std::milli>(io_end-io_start).count() << " ms" << std::endl;
return true;
};
auto re = ParallelForAsync_test(std::move(vec_reader), read_recordbatch,
cpu_executor)
.MoveResult();
auto re_chka = re.ValueOrDie();
return ::arrow::Status::OK();
}
int main(int argc, char* argv[]) {
std::string file = argv[1];
int batch_size = atoi(argv[2]);
int size = 0;
auto start = std::chrono::steady_clock::now();
read_whole_file(file, batch_size, size);
auto end = std::chrono::steady_clock::now();
printf("%d thread(s) %f ms\n", size, std::chrono::duration<double,
std::milli>(end-start).count());
return 0;
}
```
### Component(s)
C++
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]