fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r1007183618


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1329,14 +1336,231 @@ class TypedRecordReader : public 
ColumnReaderImplBase<DType>,
     return records_read;
   }
 
+  // Throw away levels from start_levels_position to levels_position_.
+  // Will update levels_position_, levels_written_, and levels_capacity_
+  // accordingly and move the levels to left to fill in the gap.
+  // It will resize the buffer without releasing the memory allocation.
+  void ThrowAwayLevels(int64_t start_levels_position) {
+    ARROW_DCHECK_LE(levels_position_, levels_written_);
+    ARROW_DCHECK_LE(start_levels_position, levels_position_);
+    ARROW_DCHECK_GT(this->max_def_level_, 0);
+    ARROW_DCHECK_NE(def_levels_, nullptr);
+
+    int64_t gap = levels_position_ - start_levels_position;
+    if (gap == 0) return;
+
+    int64_t levels_remaining = levels_written_ - gap;
+    int64_t destination = levels_position_ - gap;
+
+    auto left_shift = [=](::arrow::ResizableBuffer* buffer) {
+      int16_t* data = reinterpret_cast<int16_t*>(buffer->mutable_data());
+      std::copy(data + levels_position_, data + levels_written_, data + 
destination);
+      PARQUET_THROW_NOT_OK(buffer->Resize(levels_remaining * sizeof(int16_t),
+                                          /*shrink_to_fit=*/false));
+    };
+
+    left_shift(def_levels_.get());
+
+    if (this->max_rep_level_ > 0) {
+      ARROW_DCHECK_NE(rep_levels_, nullptr);
+      left_shift(rep_levels_.get());
+    }
+
+    levels_written_ -= gap;
+    levels_position_ -= gap;
+    levels_capacity_ -= gap;
+  }
+
+  // Skip records that we have in our buffer. This function is only for
+  // non-repeated fields.
+  int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) {
+    ARROW_DCHECK_EQ(this->max_rep_level_, 0);
+    if (!this->has_values_to_process() || num_records == 0) return 0;
+
+    int64_t remaining_records = levels_written_ - levels_position_;
+    int64_t skipped_records = std::min(num_records, remaining_records);
+    int64_t start_levels_position = levels_position_;
+    // Since there is no repetition, number of levels equals number of records.
+    levels_position_ += skipped_records;
+
+    // We skipped the levels by incrementing 'levels_position_'. For values
+    // we do not have a buffer, so we need to read them and throw them away.
+    // First we need to figure out how many present/not-null values there are.
+    std::shared_ptr<::arrow::ResizableBuffer> valid_bits;
+    valid_bits = AllocateBuffer(this->pool_);
+    
PARQUET_THROW_NOT_OK(valid_bits->Resize(bit_util::BytesForBits(skipped_records),
+                                            /*shrink_to_fit=*/true));
+    ValidityBitmapInputOutput validity_io;
+    validity_io.values_read_upper_bound = skipped_records;
+    validity_io.valid_bits = valid_bits->mutable_data();
+    validity_io.valid_bits_offset = 0;
+    DefLevelsToBitmap(def_levels() + start_levels_position, skipped_records,
+                      this->leaf_info_, &validity_io);
+    int64_t values_to_read = validity_io.values_read - validity_io.null_count;
+
+    // Now that we have figured out number of values to read, we do not need
+    // these levels anymore. We will remove these values from the buffer.
+    // This requires shifting the levels in the buffer to left. So this will
+    // update levels_position_ and levels_written_.
+    ThrowAwayLevels(start_levels_position);
+    // For values, we do not have them in buffer, so we will read them and
+    // throw them away.
+    ReadAndThrowAwayValues(values_to_read);

Review Comment:
   Done, by throwing the error in the function itself as suggested below.



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1329,14 +1336,231 @@ class TypedRecordReader : public 
ColumnReaderImplBase<DType>,
     return records_read;
   }
 
+  // Throw away levels from start_levels_position to levels_position_.
+  // Will update levels_position_, levels_written_, and levels_capacity_
+  // accordingly and move the levels to left to fill in the gap.
+  // It will resize the buffer without releasing the memory allocation.
+  void ThrowAwayLevels(int64_t start_levels_position) {
+    ARROW_DCHECK_LE(levels_position_, levels_written_);
+    ARROW_DCHECK_LE(start_levels_position, levels_position_);
+    ARROW_DCHECK_GT(this->max_def_level_, 0);
+    ARROW_DCHECK_NE(def_levels_, nullptr);
+
+    int64_t gap = levels_position_ - start_levels_position;
+    if (gap == 0) return;
+
+    int64_t levels_remaining = levels_written_ - gap;
+    int64_t destination = levels_position_ - gap;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to