This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 02de3c1789 GH-37917: [Parquet] Add OpenAsync for FileSource (#37918)
02de3c1789 is described below
commit 02de3c1789460304e958936b78d60f824921c250
Author: Eero Lihavainen <[email protected]>
AuthorDate: Wed Oct 4 17:52:04 2023 +0300
GH-37917: [Parquet] Add OpenAsync for FileSource (#37918)
### Rationale for this change
Improves performance of file reads with an expensive Open operation.
### What changes are included in this PR?
### Are these changes tested?
### Are there any user-facing changes?
No
* Closes: #37917
Authored-by: Eero Lihavainen <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/dataset/file_base.cc | 14 ++++++++++
cpp/src/arrow/dataset/file_base.h | 1 +
cpp/src/arrow/dataset/file_parquet.cc | 50 ++++++++++++++++++++---------------
3 files changed, 43 insertions(+), 22 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_base.cc
b/cpp/src/arrow/dataset/file_base.cc
index 2fcd57d2f3..6a97b51cf2 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -81,6 +81,20 @@ Result<std::shared_ptr<io::RandomAccessFile>>
FileSource::Open() const {
return custom_open_();
}
+Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
+ if (filesystem_) {
+ return filesystem_->OpenInputFileAsync(file_info_);
+ }
+
+ if (buffer_) {
+ return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(
+ std::make_shared<io::BufferReader>(buffer_));
+ }
+
+ // TODO(GH-37962): custom_open_ should not block
+ return
Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
+}
+
int64_t FileSource::Size() const {
if (filesystem_) {
return file_info_.size();
diff --git a/cpp/src/arrow/dataset/file_base.h
b/cpp/src/arrow/dataset/file_base.h
index d33d88e996..46fc8ebc40 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public
util::EqualityComparable<FileSource> {
/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
+ Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;
/// \brief Get the size (in bytes) of the file or buffer
/// If the file is compressed this should be the compressed (on-disk) size.
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index 751937e93b..3cad1ddd83 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -479,29 +479,35 @@ Future<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
- ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
- // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
- auto reader_fut = parquet::ParquetFileReader::OpenAsync(
- std::move(input), std::move(properties), metadata);
- auto path = source.path();
+
auto self = checked_pointer_cast<const
ParquetFileFormat>(shared_from_this());
- return reader_fut.Then(
- [=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
- -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
- ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader>
reader,
- reader_fut.MoveResult());
- std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
- auto arrow_properties =
- MakeArrowReaderProperties(*this, *metadata, *options,
*parquet_scan_options);
- std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
- RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool,
std::move(reader),
-
std::move(arrow_properties),
- &arrow_reader));
- return std::move(arrow_reader);
- },
- [path](
- const Status& status) ->
Result<std::shared_ptr<parquet::arrow::FileReader>> {
- return WrapSourceError(status, path);
+
+ return source.OpenAsync().Then(
+ [=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
+ return parquet::ParquetFileReader::OpenAsync(input,
std::move(properties),
+ metadata)
+ .Then(
+ [=](const std::unique_ptr<parquet::ParquetFileReader>& reader)
mutable
+ -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+ auto arrow_properties = MakeArrowReaderProperties(
+ *self, *reader->metadata(), *options,
*parquet_scan_options);
+
+ std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+ RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+ options->pool,
+ // TODO(ARROW-12259): workaround since we have
Future<(move-only
+ // type)> It *wouldn't* be safe to const_cast reader
except that
+ // here we know there are no other waiters on the reader.
+
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
+ reader)),
+ std::move(arrow_properties), &arrow_reader));
+
+ return std::move(arrow_reader);
+ },
+ [path = source.path()](const Status& status)
+ -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+ return WrapSourceError(status, path);
+ });
});
}