This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 76c4a6e317 GH-37487: [C++][Parquet] Dataset: Implement sync
`ParquetFileFormat::GetReader` (#37514)
76c4a6e317 is described below
commit 76c4a6e317436d616ddcd62ca21245bebef6091d
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 00:29:10 2023 +0800
GH-37487: [C++][Parquet] Dataset: Implement sync
`ParquetFileFormat::GetReader` (#37514)
### Rationale for this change
As https://github.com/apache/arrow/issues/37487 says. When thread cnt == 1,
the thread might blocking in `ParquetFileFormat::GetReaderAsync`, that's
because:
1. `ParquetFileFormat::CountRows` would call `EnsureCompleteMetadata` in
`io_executor`
2. `EnsureCompleteMetadata` call `ParquetFileFormat::GetReader`, which
dispatch real request to async mode
3. `async` is executed in `io_executor`.
1/3 in same fix-sized executor, causing deadlock.
### What changes are included in this PR?
Implement sync `ParquetFileFormat::GetReader`.
### Are these changes tested?
Currently not
### Are there any user-facing changes?
Bugfix
* Closes: #37487
Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/dataset/file_parquet.cc | 61 ++++++++++++++++++++++++------
cpp/src/arrow/dataset/file_parquet_test.cc | 25 ++++++++++++
2 files changed, 74 insertions(+), 12 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index c30441d911..9d0e8a6515 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -88,6 +88,22 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
return properties;
}
+parquet::ArrowReaderProperties MakeArrowReaderProperties(
+ const ParquetFileFormat& format, const parquet::FileMetaData& metadata,
+ const ScanOptions& options, const ParquetFragmentScanOptions&
parquet_scan_options) {
+ auto arrow_properties = MakeArrowReaderProperties(format, metadata);
+ arrow_properties.set_batch_size(options.batch_size);
+ // Must be set here since the sync ScanTask handles pre-buffering itself
+ arrow_properties.set_pre_buffer(
+ parquet_scan_options.arrow_reader_properties->pre_buffer());
+ arrow_properties.set_cache_options(
+ parquet_scan_options.arrow_reader_properties->cache_options());
+ arrow_properties.set_io_context(
+ parquet_scan_options.arrow_reader_properties->io_context());
+ arrow_properties.set_use_threads(options.use_threads);
+ return arrow_properties;
+}
+
template <typename M>
Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
const M& metadata, const parquet::ArrowReaderProperties& properties) {
@@ -410,13 +426,42 @@ Result<std::shared_ptr<Schema>>
ParquetFileFormat::Inspect(
Result<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options)
const {
- return GetReaderAsync(source, options, nullptr).result();
+ return GetReader(source, options, /*metadata=*/nullptr);
}
Result<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
- return GetReaderAsync(source, options, metadata).result();
+ ARROW_ASSIGN_OR_RAISE(
+ auto parquet_scan_options,
+ GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName,
options.get(),
+
default_fragment_scan_options));
+ auto properties =
+ MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
+ ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+ // `parquet::ParquetFileReader::Open` will not wrap the exception as status,
+ // so using `open_parquet_file` to wrap it.
+ auto open_parquet_file = [&]() ->
Result<std::unique_ptr<parquet::ParquetFileReader>> {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ auto reader = parquet::ParquetFileReader::Open(std::move(input),
+ std::move(properties),
metadata);
+ return reader;
+ END_PARQUET_CATCH_EXCEPTIONS
+ };
+
+ auto reader_opt = open_parquet_file();
+ if (!reader_opt.ok()) {
+ return WrapSourceError(reader_opt.status(), source.path());
+ }
+ auto reader = std::move(reader_opt).ValueOrDie();
+
+ std::shared_ptr<parquet::FileMetaData> reader_metadata = reader->metadata();
+ auto arrow_properties =
+ MakeArrowReaderProperties(*this, *reader_metadata, *options,
*parquet_scan_options);
+ std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+ RETURN_NOT_OK(parquet::arrow::FileReader::Make(
+ options->pool, std::move(reader), std::move(arrow_properties),
&arrow_reader));
+ return arrow_reader;
}
Future<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReaderAsync(
@@ -445,16 +490,8 @@ Future<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader>
reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
- auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
- arrow_properties.set_batch_size(options->batch_size);
- // Must be set here since the sync ScanTask handles pre-buffering
itself
- arrow_properties.set_pre_buffer(
- parquet_scan_options->arrow_reader_properties->pre_buffer());
- arrow_properties.set_cache_options(
- parquet_scan_options->arrow_reader_properties->cache_options());
- arrow_properties.set_io_context(
- parquet_scan_options->arrow_reader_properties->io_context());
- arrow_properties.set_use_threads(options->use_threads);
+ auto arrow_properties =
+ MakeArrowReaderProperties(*this, *metadata, *options,
*parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool,
std::move(reader),
std::move(arrow_properties),
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 42f923f0e6..8527c3af64 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -18,12 +18,14 @@
#include "arrow/dataset/file_parquet.h"
#include <memory>
+#include <thread>
#include <utility>
#include <vector>
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/test_util_internal.h"
+#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/io/util_internal.h"
@@ -367,6 +369,29 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) {
ASSERT_EQ(batches.size(), kNumRowGroups);
}
+TEST_F(TestParquetFileFormat, SingleThreadExecutor) {
+ // Reset capacity for io executor
+ struct PoolResetGuard {
+ int original_capacity = io::GetIOThreadPoolCapacity();
+ ~PoolResetGuard() {
DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity)); }
+ } guard;
+ ASSERT_OK(io::SetIOThreadPoolCapacity(1));
+
+ auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
+ auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto source = std::make_shared<FileSource>(std::move(buffer_reader),
buffer->size());
+ auto options = std::make_shared<ScanOptions>();
+
+ {
+ auto fragment = MakeFragment(*source);
+ auto count_rows = fragment->CountRows(literal(true), options);
+ ASSERT_OK_AND_ASSIGN(auto result, count_rows.MoveResult());
+ ASSERT_EQ(expected_rows(), result);
+ }
+}
+
class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin,
public testing::Test {
public: