This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 28ca876dc4 GH-34335: [C++][Parquet] Optimize Decoding
DELTA_LENGTH_BYTE_ARRAY (#34955)
28ca876dc4 is described below
commit 28ca876dc41b696bda8159daa1c4e9be2b799c48
Author: mwish <[email protected]>
AuthorDate: Sun Apr 9 02:16:11 2023 +0800
GH-34335: [C++][Parquet] Optimize Decoding DELTA_LENGTH_BYTE_ARRAY (#34955)
### Rationale for this change
According to https://github.com/apache/arrow/pull/34323 .
DELTA_LENGTH_BYTE_ARRAY is much more slower. So do some optimizations.
### What changes are included in this PR?
Some tiny changes
### Are these changes tested?
### Are there any user-facing changes?
* Closes: #34335
Authored-by: mwish <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/encoding.cc | 26 ++++++++++----------------
1 file changed, 10 insertions(+), 16 deletions(-)
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 1465022cb2..a28e0c65d8 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2718,21 +2718,14 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool =
::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
- buffered_length_(AllocateBuffer(pool, 0)),
- buffered_data_(AllocateBuffer(pool, 0)) {}
+ buffered_length_(AllocateBuffer(pool, 0)) {}
void SetData(int num_values, const uint8_t* data, int len) override {
- num_values_ = num_values;
+ DecoderImpl::SetData(num_values, data, len);
decoder_ = std::make_shared<::arrow::bit_util::BitReader>(data, len);
DecodeLengths();
}
- void SetDecoder(int num_values,
std::shared_ptr<::arrow::bit_util::BitReader> decoder) {
- num_values_ = num_values;
- decoder_ = decoder;
- DecodeLengths();
- }
-
int Decode(ByteArray* buffer, int max_values) override {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
@@ -2745,6 +2738,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int32_t data_size = 0;
const int32_t* length_ptr =
reinterpret_cast<const int32_t*>(buffered_length_->data()) +
length_idx_;
+ int bytes_offset = len_ - decoder_->bytes_left();
for (int i = 0; i < max_values; ++i) {
int32_t len = length_ptr[i];
if (ARROW_PREDICT_FALSE(len < 0)) {
@@ -2756,13 +2750,10 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
}
}
length_idx_ += max_values;
-
- PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
- if (decoder_->GetBatch(8, buffered_data_->mutable_data(), data_size) !=
data_size) {
+ if (ARROW_PREDICT_FALSE(!decoder_->Advance(8 *
static_cast<int64_t>(data_size)))) {
ParquetException::EofException();
}
- const uint8_t* data_ptr = buffered_data_->data();
-
+ const uint8_t* data_ptr = data_ + bytes_offset;
for (int i = 0; i < max_values; ++i) {
buffer[i].ptr = data_ptr;
data_ptr += buffer[i].len;
@@ -2850,7 +2841,6 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int num_valid_values_;
uint32_t length_idx_;
std::shared_ptr<ResizableBuffer> buffered_length_;
- std::shared_ptr<ResizableBuffer> buffered_data_;
};
// ----------------------------------------------------------------------
@@ -3071,8 +3061,12 @@ class DeltaByteArrayDecoder : public DecoderImpl,
prefix_len_offset_ = 0;
num_valid_values_ = num_prefix;
+ int bytes_left = decoder_->bytes_left();
+ // If len < bytes_left, prefix_len_decoder.Decode will throw exception.
+ DCHECK_GE(len, bytes_left);
+ int suffix_begins = len - bytes_left;
// at this time, the decoder_ will be at the start of the encoded suffix
data.
- suffix_decoder_.SetDecoder(num_values, decoder_);
+ suffix_decoder_.SetData(num_values, data + suffix_begins, bytes_left);
// TODO: read corrupted files written with bug(PARQUET-246). last_value_
should be set
// to last_value_in_previous_page_ when decoding a new page(except the
first page)