wgtmac commented on PR #49855:
URL: https://github.com/apache/arrow/pull/49855#issuecomment-4807959928
Thanks for updating it! I still think this PR is more complex than
necessary. A few concerns:
- The new eviction API requires callers to manually decide when and what to
evict, which is easy to misuse.
- The implementation tracks evicted state by row group runs, but the API
also accepts `column_indices`, so column-subset eviction can be incorrect or
ambiguous.
- The run-merging / bounding-box logic is subtle, especially with filtered
row groups and coalesced cache entries.
- The comments are quite verbose and sometimes explain the implementation
too mechanically. It makes the patch feel harder to review than the actual idea.
Could we instead make this automatic for the sequential Arrow reader path?
For example, add an opt-in reader property, and after each row group is
decoded, advance a simple watermark over the contiguous completed row groups.
Then evict cache entries ending before the first byte any remaining row group
may need.
That would keep `PreBuffer()` semantics unchanged, avoid a public
row-group/column eviction API, handle readahead safely, and reduce the amount
of Parquet-specific bookkeeping needed.
I asked Codex to draft the patch below to demonstrate my idea. I'm not
suggesting to adopt it as is but just a demo code in case I didn't make it
clear.
<details>
```
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 41fdd9f781..8305869bcf 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -151,8 +151,10 @@ struct ReadRangeCache::Impl {
IOContext ctx;
CacheOptions options;
- // Ordered by offset (so as to find a matching region by binary search)
+ // Ordered by offset (so as to find a matching region by binary search).
+ // Protects entries and the futures stored in entries.
std::vector<RangeCacheEntry> entries;
+ std::mutex mutex;
virtual ~Impl() = default;
@@ -179,14 +181,17 @@ struct ReadRangeCache::Impl {
ranges, internal::CoalesceReadRanges(std::move(ranges),
options.hole_size_limit,
options.range_size_limit));
std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
- // Add new entries, themselves ordered by offset
- if (entries.size() > 0) {
- std::vector<RangeCacheEntry> merged(entries.size() +
new_entries.size());
- std::merge(entries.begin(), entries.end(), new_entries.begin(),
new_entries.end(),
- merged.begin());
- entries = std::move(merged);
- } else {
- entries = std::move(new_entries);
+ {
+ std::unique_lock<std::mutex> guard(mutex);
+ // Add new entries, themselves ordered by offset
+ if (entries.size() > 0) {
+ std::vector<RangeCacheEntry> merged(entries.size() +
new_entries.size());
+ std::merge(entries.begin(), entries.end(), new_entries.begin(),
new_entries.end(),
+ merged.begin());
+ entries = std::move(merged);
+ } else {
+ entries = std::move(new_entries);
+ }
}
// Prefetch immediately, regardless of executor availability, if
possible
auto st = file->WillNeed(ranges);
@@ -205,14 +210,20 @@ struct ReadRangeCache::Impl {
return std::make_shared<Buffer>(&byte, 0);
}
- const auto it = std::lower_bound(
- entries.begin(), entries.end(), range,
- [](const RangeCacheEntry& entry, const ReadRange& range) {
- return entry.range.offset + entry.range.length < range.offset +
range.length;
- });
- if (it != entries.end() && it->range.Contains(range)) {
- auto fut = MaybeRead(&*it);
- ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+ Future<std::shared_ptr<Buffer>> fut;
+ int64_t slice_offset = 0;
+ {
+ std::unique_lock<std::mutex> guard(mutex);
+ const auto it = std::lower_bound(
+ entries.begin(), entries.end(), range,
+ [](const RangeCacheEntry& entry, const ReadRange& range) {
+ return entry.range.offset + entry.range.length < range.offset +
range.length;
+ });
+ if (it == entries.end() || !it->range.Contains(range)) {
+ return Status::Invalid("ReadRangeCache did not find matching cache
entry");
+ }
+ fut = MaybeRead(&*it);
+ slice_offset = range.offset - it->range.offset;
if (options.lazy && options.prefetch_limit > 0) {
int64_t num_prefetched = 0;
for (auto next_it = it + 1;
@@ -226,19 +237,41 @@ struct ReadRangeCache::Impl {
++num_prefetched;
}
}
- return SliceBuffer(std::move(buf), range.offset - it->range.offset,
range.length);
}
- return Status::Invalid("ReadRangeCache did not find matching cache
entry");
+ ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+ return SliceBuffer(std::move(buf), slice_offset, range.length);
}
virtual Future<> Wait() {
std::vector<Future<>> futures;
- for (auto& entry : entries) {
- futures.emplace_back(MaybeRead(&entry));
+ {
+ std::unique_lock<std::mutex> guard(mutex);
+ futures.reserve(entries.size());
+ for (auto& entry : entries) {
+ futures.emplace_back(MaybeRead(&entry));
+ }
}
return AllComplete(futures);
}
+ int64_t EvictEntriesBefore(int64_t end_offset) {
+ int64_t n_evicted = 0;
+ std::unique_lock<std::mutex> guard(mutex);
+ auto it = entries.begin();
+ while (it != entries.end()) {
+ if (it->range.offset >= end_offset) {
+ break;
+ }
+ if (it->range.length <= end_offset - it->range.offset) {
+ it = entries.erase(it);
+ ++n_evicted;
+ } else {
+ ++it;
+ }
+ }
+ return n_evicted;
+ }
+
// Return a Future that completes when the given ranges have been read.
virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
auto end = std::remove_if(ranges.begin(), ranges.end(),
@@ -246,17 +279,21 @@ struct ReadRangeCache::Impl {
ranges.resize(end - ranges.begin());
std::vector<Future<>> futures;
futures.reserve(ranges.size());
- for (auto& range : ranges) {
- const auto it = std::lower_bound(
- entries.begin(), entries.end(), range,
- [](const RangeCacheEntry& entry, const ReadRange& range) {
- return entry.range.offset + entry.range.length < range.offset +
range.length;
- });
- if (it != entries.end() && it->range.Contains(range)) {
- futures.push_back(Future<>(MaybeRead(&*it)));
- } else {
- return Status::Invalid("Range was not requested for caching:
offset=",
- range.offset, " length=", range.length);
+ {
+ std::unique_lock<std::mutex> guard(mutex);
+ for (auto& range : ranges) {
+ const auto it =
+ std::lower_bound(entries.begin(), entries.end(), range,
+ [](const RangeCacheEntry& entry, const
ReadRange& range) {
+ return entry.range.offset +
entry.range.length <
+ range.offset + range.length;
+ });
+ if (it != entries.end() && it->range.Contains(range)) {
+ futures.push_back(Future<>(MaybeRead(&*it)));
+ } else {
+ return Status::Invalid("Range was not requested for caching:
offset=",
+ range.offset, " length=", range.length);
+ }
}
}
return AllComplete(futures);
@@ -339,6 +376,10 @@ Future<> ReadRangeCache::WaitFor(std::vector<ReadRange>
ranges) {
return impl_->WaitFor(std::move(ranges));
}
+int64_t ReadRangeCache::EvictEntriesBefore(int64_t end_offset) {
+ return impl_->EvictEntriesBefore(end_offset);
+}
+
} // namespace internal
} // namespace io
} // namespace arrow
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index e2b911fafd..55b1a03fc1 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -142,6 +142,14 @@ class ARROW_EXPORT ReadRangeCache {
/// \brief Wait until all given ranges have been cached.
Future<> WaitFor(std::vector<ReadRange> ranges);
+ /// \brief Evict cache entries ending at or before the given offset.
+ ///
+ /// This releases the memory held by those entries. Buffers already
returned
+ /// by Read() remain valid through their shared ownership.
+ ///
+ /// \return Number of cache entries that were evicted.
+ int64_t EvictEntriesBefore(int64_t end_offset);
+
protected:
struct Impl;
struct LazyImpl;
diff --git a/cpp/src/parquet/arrow/reader.cc
b/cpp/src/parquet/arrow/reader.cc
index cc107c1802..9e1fbbb538 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,9 @@
#include <algorithm>
#include <cstring>
+#include <limits>
#include <memory>
+#include <mutex>
#include <random>
#include <unordered_set>
#include <utility>
@@ -1144,6 +1146,39 @@ Result<std::unique_ptr<RecordBatchReader>>
FileReaderImpl::GetRecordBatchReader(
/// Given a file reader and a list of row groups, this is a generator of
record
/// batch generators (where each sub-generator is the contents of a single
row group).
+class ReadCacheEvictionState {
+ public:
+ explicit ReadCacheEvictionState(std::vector<int64_t> evict_before_offsets)
+ : evict_before_offsets_(std::move(evict_before_offsets)),
+ completed_(evict_before_offsets_.size() - 1, false) {}
+
+ void RowGroupDecoded(ParquetFileReader* reader, size_t row_group_index) {
+ int64_t evict_before = -1;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (row_group_index >= completed_.size() ||
completed_[row_group_index]) {
+ return;
+ }
+ completed_[row_group_index] = true;
+ const size_t old_completed_prefix = completed_prefix_;
+ while (completed_prefix_ < completed_.size() &&
completed_[completed_prefix_]) {
+ ++completed_prefix_;
+ }
+ if (completed_prefix_ == old_completed_prefix) {
+ return;
+ }
+ evict_before = evict_before_offsets_[completed_prefix_];
+ }
+ reader->EvictPreBufferedDataBefore(evict_before);
+ }
+
+ private:
+ std::mutex mutex_;
+ std::vector<int64_t> evict_before_offsets_;
+ std::vector<bool> completed_;
+ size_t completed_prefix_ = 0;
+};
+
class RowGroupGenerator {
public:
using RecordBatchGenerator =
@@ -1157,12 +1192,14 @@ class RowGroupGenerator {
explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
::arrow::internal::Executor* cpu_executor,
std::vector<int> row_groups, std::vector<int>
column_indices,
- int64_t min_rows_in_flight)
+ int64_t min_rows_in_flight,
+ std::shared_ptr<ReadCacheEvictionState>
eviction_state)
: arrow_reader_(std::move(arrow_reader)),
cpu_executor_(cpu_executor),
row_groups_(std::move(row_groups)),
column_indices_(std::move(column_indices)),
min_rows_in_flight_(min_rows_in_flight),
+ eviction_state_(std::move(eviction_state)),
rows_in_flight_(0),
index_(0),
readahead_index_(0) {}
@@ -1207,12 +1244,21 @@ class RowGroupGenerator {
} else {
auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
- row_group_read =
- ready.Then([cpu_executor = cpu_executor_, reader, row_group,
- column_indices = std::move(
- column_indices)]() ->
::arrow::Future<RecordBatchGenerator> {
- return ReadOneRowGroup(cpu_executor, reader, row_group,
column_indices);
- });
+ row_group_read = ready.Then([cpu_executor = cpu_executor_, reader,
row_group,
+ row_group_index, eviction_state =
eviction_state_,
+ column_indices =
std::move(column_indices)]()
+ ->
::arrow::Future<RecordBatchGenerator> {
+ return ReadOneRowGroup(cpu_executor, reader, row_group,
column_indices)
+ .Then([reader, row_group_index, eviction_state =
std::move(eviction_state)](
+ RecordBatchGenerator generator)
+ -> ::arrow::Result<RecordBatchGenerator> {
+ if (eviction_state) {
+ eviction_state->RowGroupDecoded(reader->parquet_reader(),
+ row_group_index);
+ }
+ return std::move(generator);
+ });
+ });
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
}
@@ -1253,6 +1299,7 @@ class RowGroupGenerator {
std::vector<int> row_groups_;
std::vector<int> column_indices_;
int64_t min_rows_in_flight_;
+ std::shared_ptr<ReadCacheEvictionState> eviction_state_;
std::queue<ReadRequest> in_flight_reads_;
int64_t rows_in_flight_;
size_t index_;
@@ -1275,10 +1322,30 @@
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
+ std::shared_ptr<ReadCacheEvictionState> eviction_state;
+ if (reader_properties_.pre_buffer() &&
reader_properties_.auto_evict_read_cache() &&
+ !column_indices.empty()) {
+ const int64_t no_more_ranges = std::numeric_limits<int64_t>::max();
+ std::vector<int64_t> evict_before_offsets(row_group_indices.size() + 1,
+ no_more_ranges);
+ for (int64_t i = static_cast<int64_t>(row_group_indices.size()) - 1; i
>= 0; --i) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto ranges,
reader_->GetReadRanges({row_group_indices[static_cast<size_t>(i)]},
+ column_indices));
+ int64_t row_group_min_offset = no_more_ranges;
+ for (const auto& range : ranges) {
+ row_group_min_offset = std::min(row_group_min_offset, range.offset);
+ }
+ evict_before_offsets[static_cast<size_t>(i)] = std::min(
+ evict_before_offsets[static_cast<size_t>(i + 1)],
row_group_min_offset);
+ }
+ eviction_state =
+
std::make_shared<ReadCacheEvictionState>(std::move(evict_before_offsets));
+ }
::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator>
row_group_generator =
RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
cpu_executor, row_group_indices, column_indices,
- rows_to_readahead);
+ rows_to_readahead, std::move(eviction_state));
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>
concatenated =
::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
WRAP_ASYNC_GENERATOR(std::move(concatenated));
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 2f46a5e296..bd6c3f3115 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -403,6 +403,13 @@ class SerializedFile : public
ParquetFileReader::Contents {
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
}
+ int64_t EvictPreBufferedDataBefore(int64_t end_offset) {
+ if (!cached_source_) {
+ return 0;
+ }
+ return cached_source_->EvictEntriesBefore(end_offset);
+ }
+
Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>&
column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
@@ -909,6 +916,13 @@ void ParquetFileReader::PreBuffer(const
std::vector<int>& row_groups,
file->PreBuffer(row_groups, column_indices, ctx, options);
}
+int64_t ParquetFileReader::EvictPreBufferedDataBefore(int64_t end_offset) {
+ // Access private methods here
+ SerializedFile* file =
+ ::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
+ return file->EvictPreBufferedDataBefore(end_offset);
+}
+
Result<std::vector<::arrow::io::ReadRange>>
ParquetFileReader::GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>&
column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index c42163276c..8662dcc0d9 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -201,6 +201,12 @@ class PARQUET_EXPORT ParquetFileReader {
const ::arrow::io::IOContext& ctx,
const ::arrow::io::CacheOptions& options);
+ /// \brief Release cached entries ending at or before the given offset.
+ ///
+ /// This only affects data cached by PreBuffer(). Buffers already
returned to
+ /// readers remain valid through their shared ownership.
+ int64_t EvictPreBufferedDataBefore(int64_t end_offset);
+
/// Retrieve the list of byte ranges that would need to be read to
retrieve
/// the data for the specified row groups and column indices.
///
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index e2244a1176..abd22683fb 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -1151,6 +1151,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
read_dict_indices_(),
batch_size_(kArrowDefaultBatchSize),
pre_buffer_(true),
+ auto_evict_read_cache_(false),
cache_options_(::arrow::io::CacheOptions::LazyDefaults()),
coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO),
binary_type_(kArrowDefaultBinaryType),
@@ -1235,6 +1236,14 @@ class PARQUET_EXPORT ArrowReaderProperties {
/// Return whether read coalescing is enabled.
bool pre_buffer() const { return pre_buffer_; }
+ /// Set whether to automatically evict pre-buffered read cache entries
while
+ /// consuming row groups sequentially.
+ ///
+ /// Default is false.
+ void set_auto_evict_read_cache(bool auto_evict) { auto_evict_read_cache_
= auto_evict; }
+ /// Return whether the Arrow reader automatically evicts read cache
entries.
+ bool auto_evict_read_cache() const { return auto_evict_read_cache_; }
+
/// Set options for read coalescing. This can be used to tune the
/// implementation for characteristics of different filesystems.
void set_cache_options(::arrow::io::CacheOptions options) {
cache_options_ = options; }
@@ -1300,6 +1309,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
std::unordered_set<int> read_dict_indices_;
int64_t batch_size_;
bool pre_buffer_;
+ bool auto_evict_read_cache_;
::arrow::io::IOContext io_context_;
::arrow::io::CacheOptions cache_options_;
::arrow::TimeUnit::type coerce_int96_timestamp_unit_;
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]