fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r989295024
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1329,6 +1332,185 @@ class TypedRecordReader : public
ColumnReaderImplBase<DType>,
return records_read;
}
+ // Throw away levels from start_levels_position to levels_position_.
+ // Will update levels_position_ and levels_written_ accordingly and move
+ // the levels to left to fill in the gap. It will not shrink the size
+ // of the buffer or overwrite the positions after levels_written_.
+ // This is inefficient, though necessary to consume levels that we have
+ // already read into the buffer and we want to Skip.
+ void ThrowAwayLevels(int64_t start_levels_position) {
+ ARROW_DCHECK_LE(levels_position_, levels_written_);
+ ARROW_DCHECK_LE(start_levels_position, levels_position_);
+ int64_t gap = levels_position_ - start_levels_position;
+
+ for (int64_t i = levels_position_; i < levels_written_; ++i) {
+ *(def_levels() + i - gap) = *(def_levels() + i);
+ *(rep_levels() + i - gap) = *(rep_levels() + i);
+ }
+
+ levels_written_ -= gap;
+ levels_position_ -= gap;
+ }
+
+ // Skip records that we have in our buffer. This function is only for
+ // non-repeated fields.
+ int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) {
+ ARROW_DCHECK_EQ(this->max_rep_level_, 0);
+ if (!this->has_values_to_process()) return 0;
+
+ int64_t remaining_records = levels_written_ - levels_position_;
+ int64_t skipped_records = std::min(num_records, remaining_records);
+ int64_t start_levels_position = levels_position_;
+ // Since there is no repetition, number of levels equals number of records.
+ levels_position_ += skipped_records;
+
+ // We skipped the levels by incrementing 'levels_position_'. For values
+ // we do not have a buffer, so we need to read them and throw them away.
+ // First we need to figure out how many present/not-null values there are.
+ std::shared_ptr<::arrow::ResizableBuffer> valid_bits;
+ valid_bits = AllocateBuffer(this->pool_);
+ PARQUET_THROW_NOT_OK(
+ valid_bits->Resize(bit_util::BytesForBits(skipped_records), true));
+ ValidityBitmapInputOutput validity_io;
+ validity_io.values_read_upper_bound = skipped_records;
+ validity_io.valid_bits = valid_bits->mutable_data();
+ validity_io.valid_bits_offset = 0;
+ DefLevelsToBitmap(def_levels() + start_levels_position,
+ skipped_records,
+ this->leaf_info_, &validity_io);
+ int64_t values_to_read = validity_io.values_read - validity_io.null_count;
+
+ // Now that we have figured out number of values to read, we do not need
+ // these levels anymore. Updates levels_position_ and levels_written.
+ ThrowAwayLevels(start_levels_position);
+ ReadAndThrowAway(values_to_read);
+
+ // Mark the levels as read in the underlying column reader.
+ this->ConsumeBufferedValues(skipped_records);
+
+ return skipped_records;
+ }
+
+ // Skip records for repeated fields. Returns number of skipped records.
+ int64_t SkipRecordsRepeated(int64_t num_records) {
+ ARROW_DCHECK_GT(this->max_rep_level_, 0);
+
+ // For repeated fields, we are technically reading and throwing away the
+ // levels and values since we do not know the record boundaries in advance.
+ // Keep filling the buffer and skipping until we reach the desired number
+ // of records or we run out of values in the column chunk.
+ int64_t skipped_records = 0;
+ int64_t level_batch_size = std::max<int64_t>(kMinLevelBatchSize,
num_records);
+ // If 'at_record_start_' is false, but (skip_records == num_records), it
+ // means that for the last record that was counted, we have not seen all
+ // of it's values yet.
+ while (!at_record_start_ || skipped_records < num_records) {
+ // Is there more data to read in this row group?
+ // HasNextInternal() will advance to the next page if necessary.
+ if (!this->HasNextInternal()) {
+ if (!at_record_start_) {
+ // We ended the row group while inside a record that we haven't seen
+ // the end of yet. So increment the record count for the last record
+ // in the row group
+ ++skipped_records;
+ at_record_start_ = true;
+ }
+ break;
+ }
+
+ // Read some more levels.
+ int64_t batch_size = std::min(level_batch_size,
available_values_current_page());
+ // No more data in column. This must be an empty page.
+ // If we had exhausted the last page, HasNextInternal() must have
advanced
+ // to the next page. So there must be available values to process.
+ if (batch_size == 0) {
+ break;
+ }
+
+ // For skip we will read the levels and append them to the end
+ // of the def_levels and rep_levels just like for read.
+ ReserveLevels(batch_size);
+
+ int16_t* def_levels = this->def_levels() + levels_written_;
+ int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+ int64_t levels_read = 0;
+ levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
+ if (this->ReadRepetitionLevels(batch_size, rep_levels) !=
levels_read) {
+ throw ParquetException("Number of decoded rep / def levels did not
match");
+ }
+
+ levels_written_ += levels_read;
+
+ // Look at the buffered levels, delimit them based on
+ // (rep_level == 0), report back how many records are in there, and
+ // fill in how many not-null values (def_level == max_def_level_).
+ // DelimitRecords updates levels_position_.
+ int64_t start_levels_position = levels_position_;
+ int64_t remaining_records = num_records - skipped_records;
+ int64_t values_seen = 0;
+ skipped_records += DelimitRecords(remaining_records, &values_seen);
+ if (ReadAndThrowAway(values_seen) != values_seen) {
+ throw ParquetException("Could not read and throw away requested
values");
+ }
+ // Mark those levels and values as consumed in the the underlying
page.
+ // This must be done before we throw away levels since it updates
+ // levels_position_ and levels_written_.
+ this->ConsumeBufferedValues(levels_position_ - start_levels_position);
+ // Updated levels_position_ and levels_written_.
+ ThrowAwayLevels(start_levels_position);
+ }
+
+ return skipped_records;
+ }
+
+ // Read 'num_values' values and throw them away.
+ int64_t ReadAndThrowAway(int64_t num_values) {
+ int64_t values_left = num_values;
+ int64_t batch_size = kMinLevelBatchSize; // ReadBatch with a smaller
memory footprint
+ int64_t values_read = 0;
+
+ // This will be enough scratch space to accommodate 16-bit levels or any
+ // value type
+ int value_size = type_traits<DType::type_num>::value_byte_size;
+ std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer(
+ this->pool_, batch_size * std::max<int>(sizeof(int16_t), value_size));
+ do {
+ batch_size = std::min<int>(batch_size, values_left);
+ values_read = this->ReadValues(
+ batch_size, reinterpret_cast<T*>(scratch->mutable_data()));
+ values_left -= values_read;
+ } while (values_read > 0 && values_left > 0);
+ return num_values - values_left;
+ }
+
+ int64_t SkipRecords(int64_t num_records) override {
Review Comment:
No, here is what we have now: Consider a non-repeated field, and that there
are 10 pages with 100 values each. SkipRecords(900) will skip "decoding" the
first 9 pages. It will still look at the page headers to find out how many
values there are per page.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]