zhixingheyi-tian commented on code in PR #14353:
URL: https://github.com/apache/arrow/pull/14353#discussion_r1067894864


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1957,6 +1970,138 @@ class ByteArrayChunkedRecordReader : public 
TypedRecordReader<ByteArrayType>,
   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
 };
 
+class ByteArrayChunkedOptRecordReader : public 
TypedRecordReader<ByteArrayType>,
+                                        virtual public BinaryRecordReader {
+ public:
+  ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo 
leaf_info,
+                                  ::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+    DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+    accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+    values_ = AllocateBuffer(pool);
+    offset_ = AllocateBuffer(pool);
+  }
+
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    if (uses_opt_) {
+      std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), 
ReleaseOffsets(),
+                                                      ReleaseValues()};
+      auto data = std::make_shared<::arrow::ArrayData>(
+          ::arrow::binary(), values_written(), buffers, null_count());
+
+      auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+      return chunks;
+    } else {
+      ::arrow::ArrayVector result = accumulator_.chunks;
+      if (result.size() == 0 || accumulator_.builder->length() > 0) {
+        std::shared_ptr<::arrow::Array> last_chunk;
+        PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+        result.push_back(std::move(last_chunk));
+      }
+      accumulator_.chunks = {};
+      return result;
+    }
+  }
+
+  void ReadValuesDense(int64_t values_to_read) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), 0, NULLPTR,
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, 0, &bianry_length_);
+      DCHECK_EQ(num_decoded, values_to_read);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
+          static_cast<int>(values_to_read), &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read);
+      ResetValues();
+    }
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(),
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, values_written_, &bianry_length_);
+      DCHECK_EQ(num_decoded, values_to_read - null_count);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrow(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read - null_count);
+      ResetValues();
+    }
+  }
+
+  void ReserveValues(int64_t extra_values) override {
+    const int64_t new_values_capacity =
+        UpdateCapacity(values_capacity_, values_written_, extra_values);
+    if (new_values_capacity > values_capacity_) {
+      PARQUET_THROW_NOT_OK(
+          values_->Resize(new_values_capacity * binary_per_row_length_, 
false));
+      PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, 
false));
+
+      auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data());
+      offset[0] = 0;
+
+      values_capacity_ = new_values_capacity;
+    }
+    if (leaf_info_.HasNullableValues()) {
+      int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
+      if (valid_bits_->size() < valid_bytes_new) {
+        int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
+        PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+        // Avoid valgrind warnings
+        memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+               valid_bytes_new - valid_bytes_old);
+      }
+    }
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseValues() override {
+    auto result = values_;
+    values_ = AllocateBuffer(this->pool_);
+    values_capacity_ = 0;
+    return result;
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseOffsets() override {
+    auto result = offset_;
+    if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
+      auto offsetArr = reinterpret_cast<int32_t*>(offset_->mutable_data());
+      const auto first_offset = offsetArr[0];
+      const auto last_offset = offsetArr[values_written_];
+      int64_t binary_length = last_offset - first_offset;
+      binary_per_row_length_ = binary_length / values_written_ + 1;
+      hasCal_average_len_ = true;
+    }
+    offset_ = AllocateBuffer(this->pool_);
+    bianry_length_ = 0;
+    return result;
+  }
+  void ResetValues() {
+    if (values_written_ > 0) {
+      // Resize to 0, but do not shrink to fit
+      PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
+      PARQUET_THROW_NOT_OK(offset_->Resize(0, false));
+      PARQUET_THROW_NOT_OK(values_->Resize(0, false));
+
+      values_written_ = 0;
+      values_capacity_ = 0;
+      null_count_ = 0;
+      bianry_length_ = 0;
+    }
+  }
+
+ private:
+  // Helper data structure for accumulating builder chunks
+  typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
+
+  int32_t bianry_length_ = 0;

Review Comment:
   done



##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +55,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 
1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+static constexpr int32_t kDefaultBinaryPerRowSzie = 20;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to