This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76c4a6e317 GH-37487: [C++][Parquet] Dataset: Implement sync 
`ParquetFileFormat::GetReader` (#37514)
76c4a6e317 is described below

commit 76c4a6e317436d616ddcd62ca21245bebef6091d
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 00:29:10 2023 +0800

    GH-37487: [C++][Parquet] Dataset: Implement sync 
`ParquetFileFormat::GetReader` (#37514)
    
    
    
    ### Rationale for this change
    
    As https://github.com/apache/arrow/issues/37487 says. When thread cnt == 1, 
the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's 
because:
    
    1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in 
`io_executor`
    2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which 
dispatch real request to async mode
    3. `async` is executed in `io_executor`.
    
    1/3 in same fix-sized executor, causing deadlock.
    
    ### What changes are included in this PR?
    
    Implement sync `ParquetFileFormat::GetReader`.
    
    ### Are these changes tested?
    
    Currently not
    
    ### Are there any user-facing changes?
    
    Bugfix
    
    * Closes: #37487
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/dataset/file_parquet.cc      | 61 ++++++++++++++++++++++++------
 cpp/src/arrow/dataset/file_parquet_test.cc | 25 ++++++++++++
 2 files changed, 74 insertions(+), 12 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index c30441d911..9d0e8a6515 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -88,6 +88,22 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
   return properties;
 }
 
+parquet::ArrowReaderProperties MakeArrowReaderProperties(
+    const ParquetFileFormat& format, const parquet::FileMetaData& metadata,
+    const ScanOptions& options, const ParquetFragmentScanOptions& 
parquet_scan_options) {
+  auto arrow_properties = MakeArrowReaderProperties(format, metadata);
+  arrow_properties.set_batch_size(options.batch_size);
+  // Must be set here since the sync ScanTask handles pre-buffering itself
+  arrow_properties.set_pre_buffer(
+      parquet_scan_options.arrow_reader_properties->pre_buffer());
+  arrow_properties.set_cache_options(
+      parquet_scan_options.arrow_reader_properties->cache_options());
+  arrow_properties.set_io_context(
+      parquet_scan_options.arrow_reader_properties->io_context());
+  arrow_properties.set_use_threads(options.use_threads);
+  return arrow_properties;
+}
+
 template <typename M>
 Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
     const M& metadata, const parquet::ArrowReaderProperties& properties) {
@@ -410,13 +426,42 @@ Result<std::shared_ptr<Schema>> 
ParquetFileFormat::Inspect(
 
 Result<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader(
     const FileSource& source, const std::shared_ptr<ScanOptions>& options) 
const {
-  return GetReaderAsync(source, options, nullptr).result();
+  return GetReader(source, options, /*metadata=*/nullptr);
 }
 
 Result<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader(
     const FileSource& source, const std::shared_ptr<ScanOptions>& options,
     const std::shared_ptr<parquet::FileMetaData>& metadata) const {
-  return GetReaderAsync(source, options, metadata).result();
+  ARROW_ASSIGN_OR_RAISE(
+      auto parquet_scan_options,
+      GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, 
options.get(),
+                                                         
default_fragment_scan_options));
+  auto properties =
+      MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+  // `parquet::ParquetFileReader::Open` will not wrap the exception as status,
+  // so using `open_parquet_file` to wrap it.
+  auto open_parquet_file = [&]() -> 
Result<std::unique_ptr<parquet::ParquetFileReader>> {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    auto reader = parquet::ParquetFileReader::Open(std::move(input),
+                                                   std::move(properties), 
metadata);
+    return reader;
+    END_PARQUET_CATCH_EXCEPTIONS
+  };
+
+  auto reader_opt = open_parquet_file();
+  if (!reader_opt.ok()) {
+    return WrapSourceError(reader_opt.status(), source.path());
+  }
+  auto reader = std::move(reader_opt).ValueOrDie();
+
+  std::shared_ptr<parquet::FileMetaData> reader_metadata = reader->metadata();
+  auto arrow_properties =
+      MakeArrowReaderProperties(*this, *reader_metadata, *options, 
*parquet_scan_options);
+  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+  RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+      options->pool, std::move(reader), std::move(arrow_properties), 
&arrow_reader));
+  return arrow_reader;
 }
 
 Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReaderAsync(
@@ -445,16 +490,8 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
         ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> 
reader,
                               reader_fut.MoveResult());
         std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
-        auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
-        arrow_properties.set_batch_size(options->batch_size);
-        // Must be set here since the sync ScanTask handles pre-buffering 
itself
-        arrow_properties.set_pre_buffer(
-            parquet_scan_options->arrow_reader_properties->pre_buffer());
-        arrow_properties.set_cache_options(
-            parquet_scan_options->arrow_reader_properties->cache_options());
-        arrow_properties.set_io_context(
-            parquet_scan_options->arrow_reader_properties->io_context());
-        arrow_properties.set_use_threads(options->use_threads);
+        auto arrow_properties =
+            MakeArrowReaderProperties(*this, *metadata, *options, 
*parquet_scan_options);
         std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
         RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, 
std::move(reader),
                                                        
std::move(arrow_properties),
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc 
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 42f923f0e6..8527c3af64 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -18,12 +18,14 @@
 #include "arrow/dataset/file_parquet.h"
 
 #include <memory>
+#include <thread>
 #include <utility>
 #include <vector>
 
 #include "arrow/compute/api_scalar.h"
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/dataset/test_util_internal.h"
+#include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
 #include "arrow/io/test_common.h"
 #include "arrow/io/util_internal.h"
@@ -367,6 +369,29 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) {
   ASSERT_EQ(batches.size(), kNumRowGroups);
 }
 
+TEST_F(TestParquetFileFormat, SingleThreadExecutor) {
+  // Reset capacity for io executor
+  struct PoolResetGuard {
+    int original_capacity = io::GetIOThreadPoolCapacity();
+    ~PoolResetGuard() { 
DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity)); }
+  } guard;
+  ASSERT_OK(io::SetIOThreadPoolCapacity(1));
+
+  auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
+  auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
+  auto source = std::make_shared<FileSource>(std::move(buffer_reader), 
buffer->size());
+  auto options = std::make_shared<ScanOptions>();
+
+  {
+    auto fragment = MakeFragment(*source);
+    auto count_rows = fragment->CountRows(literal(true), options);
+    ASSERT_OK_AND_ASSIGN(auto result, count_rows.MoveResult());
+    ASSERT_EQ(expected_rows(), result);
+  }
+}
+
 class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin,
                                      public testing::Test {
  public:

Reply via email to