pitrou commented on code in PR #14964: URL: https://github.com/apache/arrow/pull/14964#discussion_r1094306409
########## cpp/src/parquet/page_index.cc: ########## @@ -184,8 +185,294 @@ class OffsetIndexImpl : public OffsetIndex { std::vector<PageLocation> page_locations_; }; +class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { + public: + RowGroupPageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr<RowGroupMetaData> row_group_metadata, + const ReaderProperties& properties, + int32_t row_group_ordinal, + const RowGroupIndexReadRange& index_read_range, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : input_(input), + row_group_metadata_(std::move(row_group_metadata)), + properties_(properties), + row_group_ordinal_(row_group_ordinal), + index_read_range_(index_read_range), + file_decryptor_(std::move(file_decryptor)) {} + + /// Read column index of a column chunk. + std::shared_ptr<ColumnIndex> GetColumnIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + throw ParquetException("Invalid column index at column ordinal ", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr) { + ParquetException::NYI("Cannot read encrypted column index yet"); + } + + auto column_index_location = col_chunk->GetColumnIndexLocation(); + if (!column_index_location.has_value()) { + return nullptr; + } + + CheckReadRangeOrThrow(*column_index_location, index_read_range_.column_index, + row_group_ordinal_); + + if (column_index_buffer_ == nullptr) { + PARQUET_ASSIGN_OR_THROW(column_index_buffer_, + input_->ReadAt(index_read_range_.column_index->offset, + index_read_range_.column_index->length)); + } + + int64_t buffer_offset = + column_index_location->offset - index_read_range_.column_index->offset; + // ColumnIndex::Make() requires the type of serialized thrift message to be + // uint32_t + uint32_t length = static_cast<uint32_t>(column_index_location->length); + auto descr = row_group_metadata_->schema()->Column(i); + return ColumnIndex::Make(*descr, column_index_buffer_->data() + buffer_offset, length, + properties_); + } + + /// Read offset index of a column chunk. + std::shared_ptr<OffsetIndex> GetOffsetIndex(int32_t i) override { + if (i < 0 || i >= row_group_metadata_->num_columns()) { + throw ParquetException("Invalid offset index at column ordinal ", i); + } + + auto col_chunk = row_group_metadata_->ColumnChunk(i); + std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col_chunk->crypto_metadata(); + if (crypto_metadata != nullptr) { + ParquetException::NYI("Cannot read encrypted offset index yet"); + } + + auto offset_index_location = col_chunk->GetOffsetIndexLocation(); + if (!offset_index_location.has_value()) { + return nullptr; + } + + CheckReadRangeOrThrow(*offset_index_location, index_read_range_.offset_index, + row_group_ordinal_); + + if (offset_index_buffer_ == nullptr) { + PARQUET_ASSIGN_OR_THROW(offset_index_buffer_, + input_->ReadAt(index_read_range_.offset_index->offset, + index_read_range_.offset_index->length)); + } + + int64_t buffer_offset = + offset_index_location->offset - index_read_range_.offset_index->offset; + // OffsetIndex::Make() requires the type of serialized thrift message to be + // uint32_t + uint32_t length = static_cast<uint32_t>(offset_index_location->length); + return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, length, + properties_); + } + + private: + static void CheckReadRangeOrThrow( + const IndexLocation& index_location, + const std::optional<::arrow::io::ReadRange>& index_read_range, + int32_t row_group_ordinal) { + if (!index_read_range.has_value()) { + throw ParquetException("Missing page index read range of row group ", + row_group_ordinal, + ", it may not exist or has not been requested"); + } + + /// The coalesced read range is invalid. + if (index_read_range->offset < 0 || index_read_range->length <= 0) { + throw ParquetException("Invalid page index read range: offset ", + index_read_range->offset, " length ", + index_read_range->length); + } + + /// The location to page index itself is corrupted. + if (index_location.offset < 0 || index_location.length <= 0) { + throw ParquetException("Invalid page index location: offset ", + index_location.offset, " length ", index_location.length); + } + + /// Page index location must be within the range of the read range. + if (index_location.offset < index_read_range->offset || + index_location.offset + index_location.length > + index_read_range->offset + index_read_range->length) { + throw ParquetException("Page index location [offset:", index_location.offset, + ",length:", index_location.length, + "] is out of range from previous WillNeed request [offset:", + index_read_range->offset, + ",length:", index_read_range->length, + "], row group: ", row_group_ordinal); + } + } + + private: + /// The input stream that can perform random access read. + ::arrow::io::RandomAccessFile* input_; + + /// The row group metadata to get column chunk metadata. + std::shared_ptr<RowGroupMetaData> row_group_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// The ordinal of the row group in the file. + int32_t row_group_ordinal_; + + /// File offsets and sizes of the page Index of all column chunks in the row group. + RowGroupIndexReadRange index_read_range_; + + /// File-level decryptor. + std::shared_ptr<InternalFileDecryptor> file_decryptor_; + + /// Buffer to hold the raw bytes of the page index. + /// Will be set lazily when the corresponding page index is accessed for the 1st time. + std::shared_ptr<::arrow::Buffer> column_index_buffer_; + std::shared_ptr<::arrow::Buffer> offset_index_buffer_; +}; + +class PageIndexReaderImpl : public PageIndexReader { + public: + PageIndexReaderImpl(::arrow::io::RandomAccessFile* input, + std::shared_ptr<FileMetaData> file_metadata, + const ReaderProperties& properties, + std::shared_ptr<InternalFileDecryptor> file_decryptor) + : input_(input), + file_metadata_(std::move(file_metadata)), + properties_(properties), + file_decryptor_(std::move(file_decryptor)) {} + + std::shared_ptr<RowGroupPageIndexReader> RowGroup(int i) override { + if (i < 0 || i >= file_metadata_->num_row_groups()) { + throw ParquetException("Invalid row group ordinal: ", i); + } + + auto row_group_metadata = file_metadata_->RowGroup(i); + + // Find the read range of the page index of the row group if provided by WillNeed() + RowGroupIndexReadRange index_read_range; + auto iter = index_read_ranges_.find(i); + if (iter != index_read_ranges_.cend()) { + /// This row group has been requested by WillNeed(). Only column index and/or + /// offset index of requested columns can be read. + index_read_range = iter->second; + } else { + /// If the row group has not been requested by WillNeed(), by default both column + /// index and offset index of all column chunks for the row group can be read. + index_read_range = + PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata, {}); + } + + if (index_read_range.column_index.has_value() || + index_read_range.offset_index.has_value()) { + return std::make_shared<RowGroupPageIndexReaderImpl>( + input_, std::move(row_group_metadata), properties_, i, index_read_range, + file_decryptor_); + } + + /// The row group does not has page index or has not been requested by WillNeed(). + /// Simply returns nullptr. + return nullptr; + } + + void WillNeed(const std::vector<int32_t>& row_group_indices, + const std::vector<int32_t>& column_indices, + const IndexSelection& index_selection) override { + std::vector<::arrow::io::ReadRange> read_ranges; + for (int32_t row_group_ordinal : row_group_indices) { + auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup( + *file_metadata_->RowGroup(row_group_ordinal), column_indices); + if (index_selection.column_index && read_range.column_index.has_value()) { + read_ranges.push_back(*read_range.column_index); + } else { + // Mark the column index as not requested. + read_range.column_index = std::nullopt; + } + if (index_selection.offset_index && read_range.offset_index.has_value()) { + read_ranges.push_back(*read_range.offset_index); + } else { + // Mark the offset index as not requested. + read_range.offset_index = std::nullopt; + } + index_read_ranges_.emplace(row_group_ordinal, std::move(read_range)); + } + PARQUET_THROW_NOT_OK(input_->WillNeed(read_ranges)); + } + + void WillNotNeed(const std::vector<int32_t>& row_group_indices) override { + for (int32_t row_group_ordinal : row_group_indices) { + index_read_ranges_.erase(row_group_ordinal); + } + } + + private: + /// The input stream that can perform random read. + ::arrow::io::RandomAccessFile* input_; + + /// The file metadata to get row group metadata. + std::shared_ptr<FileMetaData> file_metadata_; + + /// Reader properties used to deserialize thrift object. + const ReaderProperties& properties_; + + /// File-level decrypter. + std::shared_ptr<InternalFileDecryptor> file_decryptor_; + + /// Coalesced read ranges of page index of row groups that have been suggested by + /// WillNeed(). Key is the row group ordinal. + std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_; +}; + } // namespace +RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( + const RowGroupMetaData& row_group_metadata, const std::vector<int32_t>& columns) { + int64_t ci_start = std::numeric_limits<int64_t>::max(); + int64_t oi_start = std::numeric_limits<int64_t>::max(); + int64_t ci_end = -1; + int64_t oi_end = -1; + + auto merge_range = [](const std::optional<IndexLocation>& index_location, + int64_t* start, int64_t* end) { + if (index_location.has_value()) { + if (index_location->offset < 0 || index_location->length <= 0) { + throw ParquetException("Invalid index location: offset ", index_location->offset, + " length ", index_location->length); + } + *start = std::min(*start, index_location->offset); + *end = std::max(*end, index_location->offset + index_location->length); Review Comment: You can use `AddWithOverflow` for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org