zhixingheyi-tian commented on code in PR #14353: URL: https://github.com/apache/arrow/pull/14353#discussion_r1067902932
########## cpp/src/parquet/column_reader.cc: ########## @@ -1957,6 +1970,138 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>, typename EncodingTraits<ByteArrayType>::Accumulator accumulator_; }; +class ByteArrayChunkedOptRecordReader : public TypedRecordReader<ByteArrayType>, + virtual public BinaryRecordReader { + public: + ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) { + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + values_ = AllocateBuffer(pool); + offset_ = AllocateBuffer(pool); + } + + ::arrow::ArrayVector GetBuilderChunks() override { + if (uses_opt_) { + std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), ReleaseOffsets(), + ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>( + ::arrow::binary(), values_written(), buffers, null_count()); + + auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)}); + return chunks; + } else { + ::arrow::ArrayVector result = accumulator_.chunks; + if (result.size() == 0 || accumulator_.builder->length() > 0) { + std::shared_ptr<::arrow::Array> last_chunk; + PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); + result.push_back(std::move(last_chunk)); + } + accumulator_.chunks = {}; + return result; + } + } + + void ReadValuesDense(int64_t values_to_read) override { + if (uses_opt_) { + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( + static_cast<int>(values_to_read), 0, NULLPTR, + (reinterpret_cast<int32_t*>(offset_->mutable_data()) + values_written_), + values_, 0, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + static_cast<int>(values_to_read), &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read); + ResetValues(); + } + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + if (uses_opt_) { + int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), + (reinterpret_cast<int32_t*>(offset_->mutable_data()) + values_written_), + values_, values_written_, &bianry_length_); + DCHECK_EQ(num_decoded, values_to_read - null_count); + } else { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, &accumulator_); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + ResetValues(); + } + } + + void ReserveValues(int64_t extra_values) override { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + PARQUET_THROW_NOT_OK( + values_->Resize(new_values_capacity * binary_per_row_length_, false)); + PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, false)); + + auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data()); + offset[0] = 0; + + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = bit_util::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + std::shared_ptr<ResizableBuffer> ReleaseValues() override { + auto result = values_; + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } + std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { + auto result = offset_; + if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) { + auto offsetArr = reinterpret_cast<int32_t*>(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + int64_t binary_length = last_offset - first_offset; + binary_per_row_length_ = binary_length / values_written_ + 1; + hasCal_average_len_ = true; + } + offset_ = AllocateBuffer(this->pool_); + bianry_length_ = 0; + return result; + } + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); Review Comment: Just follow here: https://github.com/apache/arrow/blob/fc53ff8c5e2797c1a5a99db7f3aece80dd0b9f3e/cpp/src/parquet/column_reader.cc#L1846-L1851 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org