This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 02de3c1789 GH-37917: [Parquet] Add OpenAsync for FileSource (#37918)
02de3c1789 is described below

commit 02de3c1789460304e958936b78d60f824921c250
Author: Eero Lihavainen <[email protected]>
AuthorDate: Wed Oct 4 17:52:04 2023 +0300

    GH-37917: [Parquet] Add OpenAsync for FileSource (#37918)
    
    
    
    ### Rationale for this change
    
    Improves performance of file reads with an expensive Open operation.
    
    ### What changes are included in this PR?
    
    ### Are these changes tested?
    
    ### Are there any user-facing changes?
    No
    
    * Closes: #37917
    
    Authored-by: Eero Lihavainen <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/dataset/file_base.cc    | 14 ++++++++++
 cpp/src/arrow/dataset/file_base.h     |  1 +
 cpp/src/arrow/dataset/file_parquet.cc | 50 ++++++++++++++++++++---------------
 3 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc 
b/cpp/src/arrow/dataset/file_base.cc
index 2fcd57d2f3..6a97b51cf2 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -81,6 +81,20 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
FileSource::Open() const {
   return custom_open_();
 }
 
+Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
+  if (filesystem_) {
+    return filesystem_->OpenInputFileAsync(file_info_);
+  }
+
+  if (buffer_) {
+    return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(
+        std::make_shared<io::BufferReader>(buffer_));
+  }
+
+  // TODO(GH-37962): custom_open_ should not block
+  return 
Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
+}
+
 int64_t FileSource::Size() const {
   if (filesystem_) {
     return file_info_.size();
diff --git a/cpp/src/arrow/dataset/file_base.h 
b/cpp/src/arrow/dataset/file_base.h
index d33d88e996..46fc8ebc40 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public 
util::EqualityComparable<FileSource> {
 
   /// \brief Get a RandomAccessFile which views this file source
   Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
+  Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;
 
   /// \brief Get the size (in bytes) of the file or buffer
   /// If the file is compressed this should be the compressed (on-disk) size.
diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index 751937e93b..3cad1ddd83 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -479,29 +479,35 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
                                                          
default_fragment_scan_options));
   auto properties =
       MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
-  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
-  // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
-  auto reader_fut = parquet::ParquetFileReader::OpenAsync(
-      std::move(input), std::move(properties), metadata);
-  auto path = source.path();
+
   auto self = checked_pointer_cast<const 
ParquetFileFormat>(shared_from_this());
-  return reader_fut.Then(
-      [=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-      -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
-        ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> 
reader,
-                              reader_fut.MoveResult());
-        std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
-        auto arrow_properties =
-            MakeArrowReaderProperties(*this, *metadata, *options, 
*parquet_scan_options);
-        std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-        RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, 
std::move(reader),
-                                                       
std::move(arrow_properties),
-                                                       &arrow_reader));
-        return std::move(arrow_reader);
-      },
-      [path](
-          const Status& status) -> 
Result<std::shared_ptr<parquet::arrow::FileReader>> {
-        return WrapSourceError(status, path);
+
+  return source.OpenAsync().Then(
+      [=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
+        return parquet::ParquetFileReader::OpenAsync(input, 
std::move(properties),
+                                                     metadata)
+            .Then(
+                [=](const std::unique_ptr<parquet::ParquetFileReader>& reader) 
mutable
+                -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+                  auto arrow_properties = MakeArrowReaderProperties(
+                      *self, *reader->metadata(), *options, 
*parquet_scan_options);
+
+                  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+                  RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+                      options->pool,
+                      // TODO(ARROW-12259): workaround since we have 
Future<(move-only
+                      // type)> It *wouldn't* be safe to const_cast reader 
except that
+                      // here we know there are no other waiters on the reader.
+                      
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
+                          reader)),
+                      std::move(arrow_properties), &arrow_reader));
+
+                  return std::move(arrow_reader);
+                },
+                [path = source.path()](const Status& status)
+                    -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+                  return WrapSourceError(status, path);
+                });
       });
 }
 

Reply via email to