zhixingheyi-tian commented on code in PR #14353:
URL: https://github.com/apache/arrow/pull/14353#discussion_r1067902932


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1957,6 +1970,138 @@ class ByteArrayChunkedRecordReader : public 
TypedRecordReader<ByteArrayType>,
   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
 };
 
+class ByteArrayChunkedOptRecordReader : public 
TypedRecordReader<ByteArrayType>,
+                                        virtual public BinaryRecordReader {
+ public:
+  ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo 
leaf_info,
+                                  ::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+    DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+    accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+    values_ = AllocateBuffer(pool);
+    offset_ = AllocateBuffer(pool);
+  }
+
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    if (uses_opt_) {
+      std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), 
ReleaseOffsets(),
+                                                      ReleaseValues()};
+      auto data = std::make_shared<::arrow::ArrayData>(
+          ::arrow::binary(), values_written(), buffers, null_count());
+
+      auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+      return chunks;
+    } else {
+      ::arrow::ArrayVector result = accumulator_.chunks;
+      if (result.size() == 0 || accumulator_.builder->length() > 0) {
+        std::shared_ptr<::arrow::Array> last_chunk;
+        PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+        result.push_back(std::move(last_chunk));
+      }
+      accumulator_.chunks = {};
+      return result;
+    }
+  }
+
+  void ReadValuesDense(int64_t values_to_read) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), 0, NULLPTR,
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, 0, &bianry_length_);
+      DCHECK_EQ(num_decoded, values_to_read);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
+          static_cast<int>(values_to_read), &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read);
+      ResetValues();
+    }
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(),
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, values_written_, &bianry_length_);
+      DCHECK_EQ(num_decoded, values_to_read - null_count);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrow(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read - null_count);
+      ResetValues();
+    }
+  }
+
+  void ReserveValues(int64_t extra_values) override {
+    const int64_t new_values_capacity =
+        UpdateCapacity(values_capacity_, values_written_, extra_values);
+    if (new_values_capacity > values_capacity_) {
+      PARQUET_THROW_NOT_OK(
+          values_->Resize(new_values_capacity * binary_per_row_length_, 
false));
+      PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, 
false));
+
+      auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data());
+      offset[0] = 0;
+
+      values_capacity_ = new_values_capacity;
+    }
+    if (leaf_info_.HasNullableValues()) {
+      int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
+      if (valid_bits_->size() < valid_bytes_new) {
+        int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
+        PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+        // Avoid valgrind warnings
+        memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+               valid_bytes_new - valid_bytes_old);
+      }
+    }
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseValues() override {
+    auto result = values_;
+    values_ = AllocateBuffer(this->pool_);
+    values_capacity_ = 0;
+    return result;
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseOffsets() override {
+    auto result = offset_;
+    if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
+      auto offsetArr = reinterpret_cast<int32_t*>(offset_->mutable_data());
+      const auto first_offset = offsetArr[0];
+      const auto last_offset = offsetArr[values_written_];
+      int64_t binary_length = last_offset - first_offset;
+      binary_per_row_length_ = binary_length / values_written_ + 1;
+      hasCal_average_len_ = true;
+    }
+    offset_ = AllocateBuffer(this->pool_);
+    bianry_length_ = 0;
+    return result;
+  }
+  void ResetValues() {
+    if (values_written_ > 0) {
+      // Resize to 0, but do not shrink to fit
+      PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));

Review Comment:
   Just follow here: 
https://github.com/apache/arrow/blob/fc53ff8c5e2797c1a5a99db7f3aece80dd0b9f3e/cpp/src/parquet/column_reader.cc#L1846-L1851



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to