This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 969dea84f41cfccf863b14904958a5d2be91983f Author: Zoltan Borok-Nagy <borokna...@cloudera.com> AuthorDate: Mon Jan 7 16:50:36 2019 +0100 IMPALA-7979: Enhance decoders to support value-skipping This commit adds value-skipping functionality to the decoders. Value-skipping is useful when we know that we won't need the following N values, so instead of decoding and dropping them, we can just "jump" through them. This feature is a prerequisite for Parquet page skipping (IMPALA-5843). I added backend tests for all the decoders. Backed tests related to bitpacking are moved to the newly created bit-stream-utils-test. Change-Id: Ib848f1bd71735fe84e8064daf700417b32589f57 Reviewed-on: http://gerrit.cloudera.org:8080/12172 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/parquet/CMakeLists.txt | 1 + be/src/exec/parquet/parquet-bool-decoder-test.cc | 108 ++++++++ be/src/exec/parquet/parquet-bool-decoder.cc | 28 ++ be/src/exec/parquet/parquet-bool-decoder.h | 4 + be/src/util/CMakeLists.txt | 1 + be/src/util/bit-stream-utils-test.cc | 207 ++++++++++++++ be/src/util/bit-stream-utils.h | 5 + be/src/util/bit-stream-utils.inline.h | 12 + be/src/util/dict-encoding.h | 20 ++ be/src/util/dict-test.cc | 92 ++++++- be/src/util/rle-encoding.h | 87 ++++++ be/src/util/rle-test.cc | 330 ++++++++++++++--------- 12 files changed, 765 insertions(+), 130 deletions(-) diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt index a3ac2de..7bf24e2 100644 --- a/be/src/exec/parquet/CMakeLists.txt +++ b/be/src/exec/parquet/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(Parquet add_dependencies(Parquet gen-deps) +ADD_BE_LSAN_TEST(parquet-bool-decoder-test) ADD_BE_LSAN_TEST(parquet-plain-test) ADD_BE_LSAN_TEST(parquet-version-test) ADD_BE_LSAN_TEST(hdfs-parquet-scanner-test) diff --git a/be/src/exec/parquet/parquet-bool-decoder-test.cc b/be/src/exec/parquet/parquet-bool-decoder-test.cc new file mode 100644 index 0000000..a414546 --- /dev/null +++ b/be/src/exec/parquet/parquet-bool-decoder-test.cc @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet/parquet-bool-decoder.h" +#include "testutil/gtest-util.h" + +#include <vector> + +#include "common/names.h" + +namespace impala { + +void EncodeData(const vector<bool>& data, parquet::Encoding::type encoding, + uint8_t* buffer, int buffer_len) { + if (encoding == parquet::Encoding::PLAIN) { + BitWriter writer(buffer, buffer_len); + for (int b : data) { + ASSERT_TRUE(writer.PutValue(b, 1)); + } + writer.Flush(); + } else { + DCHECK(encoding == parquet::Encoding::RLE); + // We need to pass 'buffer + 4' because the ParquetBoolDecoder ignores the first 4 + // bytes (in Parquet RLE, the first 4 bytes are used to encode the data size). + RleEncoder encoder(buffer + 4, buffer_len - 4, 1, 8); + for (int b : data) { + ASSERT_TRUE(encoder.Put(b)); + } + encoder.Flush(); + } +} + +void TestSkipping(parquet::Encoding::type encoding, uint8_t* encoded_data, + int encoded_data_len, const vector<bool>& expected_data, int skip_at, + int skip_count) { + using namespace parquet; + ParquetBoolDecoder decoder; + decoder.SetData(encoding, encoded_data, encoded_data_len); + + auto Decode = [encoding, &decoder]() { + bool b; + if (encoding == Encoding::PLAIN) { + EXPECT_TRUE(decoder.DecodeValue<Encoding::PLAIN>(&b)); + } else { + EXPECT_TRUE(decoder.DecodeValue<Encoding::RLE>(&b)); + } + return b; + }; + + for (int i = 0; i < skip_at && i < expected_data.size(); ++i) { + EXPECT_EQ(Decode(), expected_data[i]) << i; + } + decoder.SkipValues(skip_count); + for (int i = skip_at + skip_count; i < expected_data.size(); ++i) { + EXPECT_EQ(Decode(), expected_data[i]) << i; + } +} + +TEST(ParquetBoolDecoder, TestDecodeAndSkipping) { + vector<bool> expected_data; + // Write 100 falses, 100 trues, 100 alternating falses and trues, 100 falses + for (int i = 0; i < 100; ++i) expected_data.push_back(false); + for (int i = 0; i < 100; ++i) expected_data.push_back(true); + for (int i = 0; i < 100; ++i) expected_data.push_back(i % 2); + for (int i = 0; i < 100; ++i) expected_data.push_back(false); + + for (auto encoding : {parquet::Encoding::PLAIN, parquet::Encoding::RLE}) { + constexpr int buffer_len = 128; + uint8_t buffer[buffer_len]; + EncodeData(expected_data, encoding, buffer, buffer_len); + TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 8); + TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 79); + TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 160); + TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 260); + TestSkipping(encoding, buffer, buffer_len, expected_data, 0, 370); + TestSkipping(encoding, buffer, buffer_len, expected_data, 27, 13); + TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 112); + TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 183); + TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 270); + TestSkipping(encoding, buffer, buffer_len, expected_data, 50, 350); + TestSkipping(encoding, buffer, buffer_len, expected_data, 123, 8); + TestSkipping(encoding, buffer, buffer_len, expected_data, 125, 100); + TestSkipping(encoding, buffer, buffer_len, expected_data, 225, 17); + TestSkipping(encoding, buffer, buffer_len, expected_data, 225, 70); + TestSkipping(encoding, buffer, buffer_len, expected_data, 235, 160); + TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 17); + TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 60); + TestSkipping(encoding, buffer, buffer_len, expected_data, 337, 63); + } +} + +} + +IMPALA_TEST_MAIN(); diff --git a/be/src/exec/parquet/parquet-bool-decoder.cc b/be/src/exec/parquet/parquet-bool-decoder.cc index fdcd1d5..31b9998 100644 --- a/be/src/exec/parquet/parquet-bool-decoder.cc +++ b/be/src/exec/parquet/parquet-bool-decoder.cc @@ -65,4 +65,32 @@ bool ParquetBoolDecoder::DecodeValues( } return true; } + +bool ParquetBoolDecoder::SkipValues(int num_values) { + DCHECK_GT(num_values, 0); + int skip_cached = min(num_unpacked_values_ - unpacked_value_idx_, num_values); + unpacked_value_idx_ += skip_cached; + if (skip_cached == num_values) return true; + int num_remaining = num_values - skip_cached; + if (encoding_ == parquet::Encoding::PLAIN) { + int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32); + if (num_to_skip > 0) bool_values_.SkipBatch(1, num_to_skip); + num_remaining -= num_to_skip; + if (num_remaining > 0) { + DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN); + num_unpacked_values_ = bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, + &unpacked_values_[0]); + if (UNLIKELY(num_unpacked_values_ < num_remaining)) return false; + unpacked_value_idx_ = num_remaining; + } + return true; + } else { + // rle_decoder_.SkipValues() might fill its internal buffer 'literal_buffer_'. + // This can result in sub-optimal decoding later, because 'literal_buffer_' might + // be used again and again, especially when reading a very long literal run. + DCHECK_EQ(encoding_, parquet::Encoding::RLE); + return rle_decoder_.SkipValues(num_remaining) == num_remaining; + } +} + } // namespace impala diff --git a/be/src/exec/parquet/parquet-bool-decoder.h b/be/src/exec/parquet/parquet-bool-decoder.h index 6ecdbd2..1498ca6 100644 --- a/be/src/exec/parquet/parquet-bool-decoder.h +++ b/be/src/exec/parquet/parquet-bool-decoder.h @@ -40,6 +40,10 @@ class ParquetBoolDecoder { /// Batched version of DecodeValue() that decodes multiple values at a time. bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT; + /// Skip 'num_values' values from the column data. + ///TODO: add e2e tests when page filtering is implemented (IMPALA-5843). + bool SkipValues(int num_values) RESTRICT; + private: /// Implementation of DecodeValues, templated by ENCODING. template <parquet::Encoding::type ENCODING> diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 1405dd8..e80560e 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -113,6 +113,7 @@ target_link_libraries(loggingsupport ${IMPALA_LINK_LIBS_DYNAMIC_TARGETS}) ADD_BE_LSAN_TEST(benchmark-test) ADD_BE_LSAN_TEST(bitmap-test) ADD_BE_LSAN_TEST(bit-packing-test) +ADD_BE_LSAN_TEST(bit-stream-utils-test) ADD_BE_LSAN_TEST(bit-util-test) ADD_BE_LSAN_TEST(blocking-queue-test) ADD_BE_LSAN_TEST(bloom-filter-test) diff --git a/be/src/util/bit-stream-utils-test.cc b/be/src/util/bit-stream-utils-test.cc new file mode 100644 index 0000000..56879e7 --- /dev/null +++ b/be/src/util/bit-stream-utils-test.cc @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "testutil/gtest-util.h" +#include "util/bit-packing.inline.h" +#include "util/bit-stream-utils.inline.h" + +#include "common/names.h" + +namespace impala { + +const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH; + +TEST(BitArray, TestBool) { + const int len = 8; + uint8_t buffer[len]; + + BitWriter writer(buffer, len); + + // Write alternating 0's and 1's + for (int i = 0; i < 8; ++i) { + bool result = writer.PutValue(static_cast<uint64_t>(i % 2), 1); + EXPECT_TRUE(result); + } + writer.Flush(); + EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + + // Write 00110011 + for (int i = 0; i < 8; ++i) { + bool result; + switch (i) { + case 0: + case 1: + case 4: + case 5: + result = writer.PutValue(0, 1); + break; + default: + result = writer.PutValue(1, 1); + break; + } + EXPECT_TRUE(result); + } + writer.Flush(); + + // Validate the exact bit value + EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); + + // Use the reader and validate + BatchedBitReader reader(buffer, len); + + // Ensure it returns the same results after Reset(). + for (int trial = 0; trial < 2; ++trial) { + bool batch_vals[16]; + EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals)); + for (int i = 0; i < 8; ++i) EXPECT_EQ(batch_vals[i], i % 2); + + for (int i = 0; i < 8; ++i) { + switch (i) { + case 0: + case 1: + case 4: + case 5: + EXPECT_EQ(batch_vals[8 + i], false); + break; + default: + EXPECT_EQ(batch_vals[8 + i], true); + break; + } + } + reader.Reset(buffer, len); + } +} + +// Tests SkipBatch() by comparing the results of a reader with skipping +// against a reader that doesn't skip. +template <typename T> +void TestSkipping(const uint8_t* buffer, const int len, const int bit_width, + const int skip_at, const int skip_count) { + constexpr int MAX_LEN = 512; + DCHECK_LE(len, MAX_LEN); + DCHECK_EQ((skip_at * bit_width) % 8, 0); + DCHECK_EQ((skip_count * bit_width) % 8, 0); + int value_count = len * 8 / bit_width; + + T all_vals[MAX_LEN]; + BatchedBitReader expected_reader(buffer, len); + expected_reader.UnpackBatch(bit_width, value_count, all_vals); + + BatchedBitReader skipping_reader(buffer, len); + T vals[MAX_LEN]; + skipping_reader.UnpackBatch(bit_width, skip_at, vals); + skipping_reader.SkipBatch(bit_width, skip_count); + skipping_reader.UnpackBatch(bit_width, value_count - skip_count, vals + skip_at); + + for (int i = 0; i < skip_at; ++i) { + EXPECT_EQ(all_vals[i], vals[i]); + } + for (int i = skip_at + skip_count; i < len; ++i) { + EXPECT_EQ(all_vals[i], vals[i - skip_count]); + } +} + +TEST(BitArray, TestBoolSkip) { + const int len = 4; + uint8_t buffer[len]; + + BitWriter writer(buffer, len); + // Write 00000000 11111111 11111111 00000000 + for (int i = 0; i < 8; ++i) ASSERT_TRUE(writer.PutValue(0, 1)); + for (int i = 0; i < 16; ++i) ASSERT_TRUE(writer.PutValue(1, 1)); + for (int i = 0; i < 8; ++i) ASSERT_TRUE(writer.PutValue(0, 1)); + writer.Flush(); + + TestSkipping<bool>(buffer, len, 1, 0, 8); + TestSkipping<bool>(buffer, len, 1, 0, 16); + TestSkipping<bool>(buffer, len, 1, 8, 8); + TestSkipping<bool>(buffer, len, 1, 8, 16); + TestSkipping<bool>(buffer, len, 1, 16, 8); + TestSkipping<bool>(buffer, len, 1, 16, 16); +} + +TEST(BitArray, TestIntSkip) { + constexpr int len = 512; + const int bit_width = 6; + uint8_t buffer[len]; + + BitWriter writer(buffer, len); + for (int i = 0; i < (1 << bit_width); ++i) { + ASSERT_TRUE(writer.PutValue(i, bit_width)); + } + int bytes_written = writer.bytes_written(); + TestSkipping<int>(buffer, bytes_written, bit_width, 0, 4); + TestSkipping<int>(buffer, bytes_written, bit_width, 4, 4); + TestSkipping<int>(buffer, bytes_written, bit_width, 4, 8); + TestSkipping<int>(buffer, bytes_written, bit_width, 8, 56); +} + +// Writes 'num_vals' values with width 'bit_width' and reads them back. +void TestBitArrayValues(int bit_width, int num_vals) { + const int len = BitUtil::Ceil(bit_width * num_vals, 8); + const int64_t mod = bit_width == 64 ? 1 : 1LL << bit_width; + + uint8_t buffer[(len > 0) ? len : 1]; + BitWriter writer(buffer, len); + for (int i = 0; i < num_vals; ++i) { + bool result = writer.PutValue(i % mod, bit_width); + EXPECT_TRUE(result); + } + writer.Flush(); + EXPECT_EQ(writer.bytes_written(), len); + + BatchedBitReader reader(buffer, len); + BatchedBitReader reader2(reader); // Test copy constructor. + // Ensure it returns the same results after Reset(). + for (int trial = 0; trial < 2; ++trial) { + // Unpack all values at once with one batched reader and in small batches with the + // other batched reader. + vector<int64_t> batch_vals(num_vals); + const int BATCH_SIZE = 32; + vector<int64_t> batch_vals2(BATCH_SIZE); + EXPECT_EQ(num_vals, + reader.UnpackBatch(bit_width, num_vals, batch_vals.data())); + for (int i = 0; i < num_vals; ++i) { + if (i % BATCH_SIZE == 0) { + int num_to_unpack = min(BATCH_SIZE, num_vals - i); + EXPECT_EQ(num_to_unpack, + reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data())); + } + EXPECT_EQ(i % mod, batch_vals[i]); + EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]); + } + EXPECT_EQ(reader.bytes_left(), 0); + EXPECT_EQ(reader2.bytes_left(), 0); + reader.Reset(buffer, len); + reader2.Reset(buffer, len); + } +} + +TEST(BitArray, TestValues) { + for (int width = 0; width <= MAX_WIDTH; ++width) { + TestBitArrayValues(width, 1); + TestBitArrayValues(width, 2); + // Don't write too many values + TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096); + TestBitArrayValues(width, 1024); + } +} + +} + +IMPALA_TEST_MAIN(); diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h index a7ef292..9889f35 100644 --- a/be/src/util/bit-stream-utils.h +++ b/be/src/util/bit-stream-utils.h @@ -133,6 +133,11 @@ class BatchedBitReader { template<typename T> int UnpackBatch(int bit_width, int num_values, T* v); + /// Skip 'num_values_to_skip' bit-packed values. + /// 'num_values_to_skip * bit_width' is either divisible by 8, or + /// 'num_values_to_skip' equals to the count of the remaining bit-packed values. + bool SkipBatch(int bit_width, int num_values_to_skip); + /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'. diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h index 48f52da..08f1700 100644 --- a/be/src/util/bit-stream-utils.inline.h +++ b/be/src/util/bit-stream-utils.inline.h @@ -102,6 +102,18 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) { return static_cast<int>(num_read); } +inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GT(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_GT(num_values_to_skip, 0); + + int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip); + if (skip_bytes > buffer_end_ - buffer_pos_) return false; + buffer_pos_ += skip_bytes; + return true; +} + template <typename T> inline int BatchedBitReader::UnpackAndDecodeBatch( int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride) { diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index cc7ef82..78dc42f 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -366,6 +366,9 @@ class DictDecoder : public DictDecoderBase { return sizeof(T) * dict_.size(); } + /// Skip 'num_values' values from the input. + bool SkipValues(int64_t num_values) WARN_UNUSED_RESULT; + private: /// List of decoded values stored in the dict_ std::vector<T> dict_; @@ -536,6 +539,23 @@ ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValues( } template <typename T> +ALWAYS_INLINE inline bool DictDecoder<T>::SkipValues(int64_t num_values) { + int64_t num_remaining = num_values; + if (num_repeats_ > 0) { + int64_t num_to_skip = std::min(num_remaining, num_repeats_); + num_repeats_ -= num_to_skip; + num_remaining -= num_to_skip; + } else if (next_literal_idx_ < num_literal_values_) { + int64_t num_to_skip = std::min<int64_t>(num_literal_values_ - + next_literal_idx_, num_remaining); + next_literal_idx_ += num_to_skip; + num_remaining -= num_to_skip; + } + if (num_remaining > 0) return data_decoder_.SkipValues(num_remaining) == num_remaining; + return true; +} + +template <typename T> uint32_t DictDecoder<T>::CopyLiteralsToOutput( uint32_t max_to_copy, StrideWriter<T>* out) { uint32_t num_to_copy = diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc index c30c30b..ebfba11 100644 --- a/be/src/util/dict-test.cc +++ b/be/src/util/dict-test.cc @@ -32,9 +32,29 @@ namespace impala { +// Helper function to validate that 'decoder' decodes the expected values. +// If 'skip_at' and 'skip_count' aren't zero, then the value skipping logic +// is also exercised. +template<typename InternalType> +void ValidateValues(DictDecoder<InternalType>& decoder, +const vector<InternalType>& values, int skip_at, int skip_count, bool skip_success) { + for (int i = 0; i < skip_at; ++i) { + InternalType j; + ASSERT_TRUE(decoder.GetNextValue(&j)); + EXPECT_EQ(values[i], j) << i; + } + EXPECT_EQ(decoder.SkipValues(skip_count), skip_success); + for (int i = skip_at + skip_count; i < values.size(); ++i) { + InternalType j; + ASSERT_TRUE(decoder.GetNextValue(&j)); + EXPECT_EQ(values[i], j) << i; + } +} + template<typename InternalType, parquet::Type::type PARQUET_TYPE> void ValidateDict(const vector<InternalType>& values, - const vector<InternalType>& dict_values, int fixed_buffer_byte_size) { + const vector<InternalType>& dict_values, int fixed_buffer_byte_size, + int skip_at = 0, int skip_count = 0, bool skip_success = true) { set<InternalType> values_set(values.begin(), values.end()); int bytes_alloc = 0; @@ -51,7 +71,7 @@ void ValidateDict(const vector<InternalType>& values, uint8_t dict_buffer[encoder.dict_encoded_size()]; encoder.WriteDict(dict_buffer); - int data_buffer_len = encoder.EstimatedDataEncodedSize(); + int data_buffer_len = encoder.EstimatedDataEncodedSize() * 2; uint8_t data_buffer[data_buffer_len]; int data_len = encoder.WriteData(data_buffer, data_buffer_len); EXPECT_GT(data_len, 0); @@ -74,13 +94,7 @@ void ValidateDict(const vector<InternalType>& values, } // Test access to dictionary via internal stream ASSERT_OK(decoder.SetData(data_buffer, data_len)); - int count = 0; - for (InternalType i : values) { - InternalType j; - ASSERT_TRUE(decoder.GetNextValue(&j)); - EXPECT_EQ(i, j) << count; - ++count; - } + ValidateValues(decoder, values, skip_at, skip_count, skip_success); pool.FreeAll(); } @@ -322,6 +336,66 @@ TEST(DictTest, DecodeErrors) { } } +TEST(DictTest, TestSkippingValues) { + auto ValidateSkipping = [](const vector<int32_t>& values, + const vector<int32_t>& dict_values, int skip_at, int skip_count, + bool skip_success = true) { + const int value_byte_size = ParquetPlainEncoder::EncodedByteSize( + ColumnType(TYPE_INT)); + ValidateDict<int32_t, parquet::Type::INT32>(values, dict_values, value_byte_size, + skip_at, skip_count, skip_success); + }; + + vector<int32_t> literal_values; + for (int i = 0; i < 200; ++i) literal_values.push_back(i); + ValidateSkipping(literal_values, literal_values, 0, 4); + ValidateSkipping(literal_values, literal_values, 0, 130); + ValidateSkipping(literal_values, literal_values, 2, 4); + ValidateSkipping(literal_values, literal_values, 4, 48); + ValidateSkipping(literal_values, literal_values, 7, 130); + // Skipping too many values should fail + ValidateSkipping(literal_values, literal_values, 4, 300, false); + + vector<int32_t> repeated_values(200, 1000); + ValidateSkipping(repeated_values, {1000}, 0, 4); + ValidateSkipping(repeated_values, {1000}, 0, 49); + ValidateSkipping(repeated_values, {1000}, 0, 145); + ValidateSkipping(repeated_values, {1000}, 3, 4); + ValidateSkipping(repeated_values, {1000}, 4, 49); + ValidateSkipping(repeated_values, {1000}, 4, 150); + // Skipping too many values should fail + ValidateSkipping(repeated_values, {1000}, 4, 300, false); + + auto Concat = [](const vector<int32_t>& a, const vector<int32_t>& b) { + vector<int32_t> ab(a); + ab.insert(ab.end(), b.begin(), b.end()); + return ab; + }; + vector<int32_t> literal_then_repeated = Concat(literal_values, repeated_values); + vector<int32_t> literal_then_repeated_dict = Concat(literal_values, {1000}); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 4); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 87); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 200); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 0, 222); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 4, 19); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 4, 200); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 200, 47); + ValidateSkipping(literal_then_repeated, literal_then_repeated_dict, 234, 166); + + vector<int32_t> repeated_then_literal = Concat(repeated_values, literal_values); + vector<int32_t> repeated_then_literal_dict = Concat({1000}, literal_values); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 4); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 89); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 200); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 0, 232); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 8); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 88); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 4, 288); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 11); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 79); + ValidateSkipping(repeated_then_literal, repeated_then_literal_dict, 230, 170); +} + } IMPALA_TEST_MAIN(); diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 7dc05ab..21adc02 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -143,7 +143,17 @@ class RleBatchDecoder { /// Returns the number of consumed values or 0 if an error occurred. int32_t GetValues(int32_t num_values_to_consume, T* values); + /// Skip 'num_values' values. + /// Returns the number of skipped values or 0 if an error occurred. + int32_t SkipValues(int32_t num_values); + private: + /// Skip 'num_literals_to_skip' literals. + bool SkipLiteralValues(int32_t num_literals_to_skip) WARN_UNUSED_RESULT; + + /// Skip 'num_values' repeated values. + void SkipRepeatedValues(int32_t num_values); + BatchedBitReader bit_reader_; /// Number of bits needed to encode the value. Must be between 0 and 64 after @@ -191,6 +201,9 @@ class RleBatchDecoder { /// 'literal_count_'. Returns the number of literals outputted. int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); + /// Skip buffered literals + int32_t SkipBufferedLiterals(int32_t max_to_skip); + /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing /// 'literal_count_'. Returns the number of literals outputted or 0 if a /// decoding error is encountered. @@ -619,6 +632,13 @@ inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { } template <typename T> +inline void RleBatchDecoder<T>::SkipRepeatedValues(int32_t num_values) { + DCHECK_GT(num_values, 0); + DCHECK_GE(repeat_count_, num_values); + repeat_count_ -= num_values; +} + +template <typename T> inline int32_t RleBatchDecoder<T>::NextNumLiterals() { if (literal_count_ > 0) return literal_count_; if (repeat_count_ == 0) NextCounts(); @@ -663,6 +683,34 @@ inline bool RleBatchDecoder<T>::GetLiteralValues( } template <typename T> +inline bool RleBatchDecoder<T>::SkipLiteralValues(int32_t num_literals_to_skip) { + DCHECK_GT(num_literals_to_skip, 0); + DCHECK_GE(literal_count_, num_literals_to_skip); + DCHECK(!HaveBufferedLiterals()); + + int32_t num_remaining = num_literals_to_skip; + + // Need to round to a batch of 32 if the caller is skipping only part of the current + // run to avoid ending on a non-byte boundary. + int32_t num_to_skip = std::min<int32_t>(literal_count_, + BitUtil::RoundDownToPowerOf2(num_remaining, 32)); + if (num_to_skip > 0) { + bit_reader_.SkipBatch(bit_width_, num_to_skip); + literal_count_ -= num_to_skip; + num_remaining -= num_to_skip; + } + + if (num_remaining > 0) { + // Earlier we called RoundDownToPowerOf2() to skip literals that fit on byte boundary. + // But some literals still need to be skipped. Let's fill the literal buffer + // and skip 'num_remaining' values. + if (UNLIKELY(!FillLiteralBuffer())) return false; + if (SkipBufferedLiterals(num_remaining) != num_remaining) return false; + } + return true; +} + +template <typename T> template <typename OutType> inline bool RleBatchDecoder<T>::DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) { @@ -774,6 +822,16 @@ inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals( } template <typename T> +inline int32_t RleBatchDecoder<T>::SkipBufferedLiterals( + int32_t max_to_skip) { + int32_t num_to_skip = + std::min<int32_t>(max_to_skip, num_buffered_literals_ - literal_buffer_pos_); + literal_buffer_pos_ += num_to_skip; + literal_count_ -= num_to_skip; + return num_to_skip; +} + +template <typename T> template <typename OutType> inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(int32_t max_to_output, OutType* dict, int64_t dict_len, StrideWriter<OutType>* RESTRICT out) { @@ -823,6 +881,35 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t num_values_to_consume, T* v } template <typename T> +inline int32_t RleBatchDecoder<T>::SkipValues(int32_t num_values) { + DCHECK_GT(num_values, 0); + + int32_t num_skipped = 0; + if (HaveBufferedLiterals()) { + num_skipped = SkipBufferedLiterals(num_values); + } + while (num_skipped < num_values) { + // Skip RLE encoded values + int32_t num_repeats = NextNumRepeats(); + if (num_repeats > 0) { + int32_t num_repeats_to_consume = + std::min(num_repeats, num_values - num_skipped); + SkipRepeatedValues(num_repeats_to_consume); + num_skipped += num_repeats_to_consume; + continue; + } + + // Skip literals + int32_t num_literals = NextNumLiterals(); + if (num_literals == 0) break; + int32_t num_literals_to_skip = std::min(num_literals, num_values - num_skipped); + if (!SkipLiteralValues(num_literals_to_skip)) return 0; + num_skipped += num_literals_to_skip; + } + return num_skipped; +} + +template <typename T> constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN; } diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index 4406e46..939fe19 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -21,10 +21,11 @@ #include <boost/utility.hpp> #include <math.h> +#include <random> #include "testutil/gtest-util.h" #include "util/bit-packing.inline.h" -#include "util/bit-stream-utils.inline.h" +#include "util/bit-stream-utils.h" #include "util/rle-encoding.h" #include "common/names.h" @@ -33,119 +34,6 @@ namespace impala { const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH; -TEST(BitArray, TestBool) { - const int len = 8; - uint8_t buffer[len]; - - BitWriter writer(buffer, len); - - // Write alternating 0's and 1's - for (int i = 0; i < 8; ++i) { - bool result = writer.PutValue(static_cast<uint64_t>(i % 2), 1); - EXPECT_TRUE(result); - } - writer.Flush(); - EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); - - // Write 00110011 - for (int i = 0; i < 8; ++i) { - bool result; - switch (i) { - case 0: - case 1: - case 4: - case 5: - result = writer.PutValue(0, 1); - break; - default: - result = writer.PutValue(1, 1); - break; - } - EXPECT_TRUE(result); - } - writer.Flush(); - - // Validate the exact bit value - EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); - EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); - - // Use the reader and validate - BatchedBitReader reader(buffer, len); - - // Ensure it returns the same results after Reset(). - for (int trial = 0; trial < 2; ++trial) { - bool batch_vals[16]; - EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals)); - for (int i = 0; i < 8; ++i) EXPECT_EQ(batch_vals[i], i % 2); - - for (int i = 0; i < 8; ++i) { - switch (i) { - case 0: - case 1: - case 4: - case 5: - EXPECT_EQ(batch_vals[8 + i], false); - break; - default: - EXPECT_EQ(batch_vals[8 + i], true); - break; - } - } - reader.Reset(buffer, len); - } -} - -// Writes 'num_vals' values with width 'bit_width' and reads them back. -void TestBitArrayValues(int bit_width, int num_vals) { - const int len = BitUtil::Ceil(bit_width * num_vals, 8); - const int64_t mod = bit_width == 64 ? 1 : 1LL << bit_width; - - uint8_t buffer[(len > 0) ? len : 1]; - BitWriter writer(buffer, len); - for (int i = 0; i < num_vals; ++i) { - bool result = writer.PutValue(i % mod, bit_width); - EXPECT_TRUE(result); - } - writer.Flush(); - EXPECT_EQ(writer.bytes_written(), len); - - BatchedBitReader reader(buffer, len); - BatchedBitReader reader2(reader); // Test copy constructor. - // Ensure it returns the same results after Reset(). - for (int trial = 0; trial < 2; ++trial) { - // Unpack all values at once with one batched reader and in small batches with the - // other batched reader. - vector<int64_t> batch_vals(num_vals); - const int BATCH_SIZE = 32; - vector<int64_t> batch_vals2(BATCH_SIZE); - EXPECT_EQ(num_vals, - reader.UnpackBatch(bit_width, num_vals, batch_vals.data())); - for (int i = 0; i < num_vals; ++i) { - if (i % BATCH_SIZE == 0) { - int num_to_unpack = min(BATCH_SIZE, num_vals - i); - EXPECT_EQ(num_to_unpack, - reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data())); - } - EXPECT_EQ(i % mod, batch_vals[i]); - EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]); - } - EXPECT_EQ(reader.bytes_left(), 0); - EXPECT_EQ(reader2.bytes_left(), 0); - reader.Reset(buffer, len); - reader2.Reset(buffer, len); - } -} - -TEST(BitArray, TestValues) { - for (int width = 0; width <= MAX_WIDTH; ++width) { - TestBitArrayValues(width, 1); - TestBitArrayValues(width, 2); - // Don't write too many values - TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096); - TestBitArrayValues(width, 1024); - } -} - class RleTest : public ::testing::Test { protected: /// All the legal values for min_repeated_run_length to pass to RleEncoder() in tests. @@ -188,12 +76,35 @@ class RleTest : public ::testing::Test { return true; } + /// Get many values from a batch RLE decoder using its low level functions. + template <typename T> + static bool GetRleValuesSkip(RleBatchDecoder<T>* decoder, int num_vals, T* vals, + int skip_at, int skip_count) { + if (!GetRleValues(decoder, skip_at, vals)) return false; + if (decoder->SkipValues(skip_count) != skip_count) return false; + int consumed = skip_at + skip_count; + if (!GetRleValues(decoder, num_vals - consumed, vals + skip_at)) return false; + return true; + } + /// Get many values from a batch RLE decoder using its GetValues() function. template <typename T> static bool GetRleValuesBatched(RleBatchDecoder<T>* decoder, int num_vals, T* vals) { return num_vals == decoder->GetValues(num_vals, vals); } + /// Get many values from a batch RLE decoder using its GetValues() function. + template <typename T> + static bool GetRleValuesBatchedSkip(RleBatchDecoder<T>* decoder, int num_vals, T* vals, + int skip_at, int skip_count) { + int cnt = 0; + if (skip_at > 0) cnt += decoder->GetValues(skip_at, vals); + if (decoder->SkipValues(skip_count) != skip_count) return false; + cnt += skip_count; + if (num_vals - cnt > 0) cnt += decoder->GetValues(num_vals - cnt, vals + skip_at); + return cnt == num_vals; + } + // Validates encoding of values by encoding and decoding them. // If expected_encoding != NULL, validates that the encoded buffer is // exactly 'expected_encoding'. @@ -260,6 +171,72 @@ class RleTest : public ::testing::Test { return encoded_len; } + int ValidateRleSkip(const vector<int>& values, int bit_width, + int min_repeated_run_length, int skip_at, int skip_count, unsigned int seed=0) { + stringstream ss; + ss << "bit_width=" << bit_width + << " min_repeated_run_length_=" << min_repeated_run_length + << " skip_at=" << skip_at + << " skip_count=" << skip_count + << " values.size()=" << values.size() + << " seed=" << seed; + const string& description = ss.str(); + const int len = 64 * 1024; + uint8_t buffer[len]; + + RleEncoder encoder(buffer, len, bit_width, min_repeated_run_length); + int encoded_len = 0; + + for (int i = 0; i < values.size(); ++i) { + bool result = encoder.Put(values[i]); + EXPECT_TRUE(result); + } + encoded_len = encoder.Flush(); + + vector<int> expected_values(values.begin(), values.begin() + skip_at); + for (int i = skip_at + skip_count; i < values.size(); ++i) { + expected_values.push_back(values[i]); + } + + // Verify read. + RleBatchDecoder<uint64_t> per_value_decoder(buffer, len, bit_width); + RleBatchDecoder<uint64_t> per_run_decoder(buffer, len, bit_width); + RleBatchDecoder<uint64_t> batch_decoder(buffer, len, bit_width); + // Ensure it returns the same results after Reset(). + for (int trial = 0; trial < 2; ++trial) { + for (int i = 0; i < skip_at; ++i) { + uint64_t val; + EXPECT_TRUE(per_value_decoder.GetSingleValue(&val)) << description; + EXPECT_EQ(expected_values[i], val) << description << " i=" << i << " trial=" + << trial; + } + per_value_decoder.SkipValues(skip_count); + for (int i = skip_at; i < expected_values.size(); ++i) { + uint64_t val; + EXPECT_TRUE(per_value_decoder.GetSingleValue(&val)) << description; + EXPECT_EQ(expected_values[i], val) << description << " i=" << i << " trial=" + << trial; + } + // Unpack everything at once from the other decoders. + vector<uint64_t> decoded_values1(expected_values.size()); + vector<uint64_t> decoded_values2(expected_values.size()); + EXPECT_TRUE( + GetRleValuesSkip(&per_run_decoder, values.size(), + decoded_values1.data(), skip_at, skip_count)) << description; + EXPECT_TRUE( + GetRleValuesBatchedSkip(&batch_decoder, values.size(), + decoded_values2.data(), skip_at, skip_count)) << description; + for (int i = 0; i < expected_values.size(); ++i) { + EXPECT_EQ(expected_values[i], decoded_values1[i]) << description << " i=" << i; + EXPECT_EQ(expected_values[i], decoded_values2[i]) << description << " i=" << i; + } + per_value_decoder.Reset(buffer, len, bit_width); + per_run_decoder.Reset(buffer, len, bit_width); + batch_decoder.Reset(buffer, len, bit_width); + } + return encoded_len; + } + // ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value // is used, otherwise alternating values are used. void TestRleValues(int bit_width, int num_vals, int value = -1) { @@ -282,23 +259,69 @@ class RleTest : public ::testing::Test { } /// Make a sequence of values. - /// literalLength1: the length of an initial literal sequence. - /// repeatedLength: the length of a repeated sequence. - /// literalLength2: the length of a closing literal sequence. - vector<int>& MakeSequence(vector<int>& values, int intitial_literal_length, - int repeated_length, int trailing_literal_length) { + /// intitial_literal_length: the length of an initial literal sequence. + /// repeated_length: the length of a repeated sequence. + /// trailing_literal_length: the length of a closing literal sequence. + /// bit_width: the bit length of the values being used. + vector<int>& MakeSequenceBitWidth(vector<int>& values, int intitial_literal_length, + int repeated_length, int trailing_literal_length, int bit_width) { + const int64_t mod = 1LL << bit_width; values.clear(); for (int i = 0; i < intitial_literal_length; ++i) { - values.push_back(i % 2); + values.push_back(i % mod); } for (int i = 0; i < repeated_length; ++i) { values.push_back(1); } for (int i = 0; i < trailing_literal_length; ++i) { - values.push_back(i % 2); + values.push_back(i % mod); } return values; } + + /// Same as above with bit width being 1. + vector<int>& MakeSequence(vector<int>& values, int intitial_literal_length, + int repeated_length, int trailing_literal_length) { + return MakeSequenceBitWidth(values, intitial_literal_length, repeated_length, + trailing_literal_length, 1); + } + + /// Generates a sequence that contains repeated and literal runs with random lengths. + /// Total length of the sequence is limited by 'max_run_length'. The random generation + /// is seeded by 'seed' to allow deterministic behavior. + vector<int> MakeRandomSequence(unsigned int seed, int total_length, int max_run_length, + int bit_width) { + std::default_random_engine random_eng(seed); + auto NextRunLength = [&]() { + std::uniform_int_distribution<int> uni_dist(1, max_run_length); + return uni_dist(random_eng); + }; + auto IsNextRunRepeated = [&random_eng]() { + std::uniform_int_distribution<int> uni_dist(0, 1); + return uni_dist(random_eng) == 0; + }; + auto NextVal = [bit_width](int val) { + return (val + 1) % (1 << bit_width); + }; + + vector<int> ret; + int run_length = 0; + int val = 0; + int is_repeated = false; + while (ret.size() < total_length) { + if (run_length == 0) { + run_length = NextRunLength(); + is_repeated = IsNextRunRepeated(); + val = NextVal(val); + } + ret.push_back(val); + if (!is_repeated) { + val = NextVal(val); + } + --run_length; + } + return ret; + } }; /// Basic test case for literal unpacking - two literals in a run. @@ -312,6 +335,71 @@ TEST_F(RleTest, TwoLiteralRun) { } } +TEST_F(RleTest, ValueSkipping) { + vector<int> seq; + for (int min_run_length : legal_min_run_lengths_) { + for (int bit_width : {1, 3, 7, 8, 20, 32}) { + MakeSequenceBitWidth(seq, 100, 100, 100, bit_width); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 7); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 64); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 75); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 100); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 105); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 155); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 200); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 213); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 267); + ValidateRleSkip(seq, bit_width, min_run_length, 0, 300); + ValidateRleSkip(seq, bit_width, min_run_length, 7, 7); + ValidateRleSkip(seq, bit_width, min_run_length, 35, 64); + ValidateRleSkip(seq, bit_width, min_run_length, 55, 75); + ValidateRleSkip(seq, bit_width, min_run_length, 99, 100); + ValidateRleSkip(seq, bit_width, min_run_length, 100, 11); + ValidateRleSkip(seq, bit_width, min_run_length, 101, 55); + ValidateRleSkip(seq, bit_width, min_run_length, 102, 155); + ValidateRleSkip(seq, bit_width, min_run_length, 104, 17); + ValidateRleSkip(seq, bit_width, min_run_length, 122, 178); + ValidateRleSkip(seq, bit_width, min_run_length, 200, 3); + ValidateRleSkip(seq, bit_width, min_run_length, 200, 65); + ValidateRleSkip(seq, bit_width, min_run_length, 203, 17); + ValidateRleSkip(seq, bit_width, min_run_length, 215, 70); + ValidateRleSkip(seq, bit_width, min_run_length, 217, 83); + } + } +} + +// Tests value-skipping on randomly generated input and +// random skipping positions and counts. +TEST_F(RleTest, ValueSkippingFuzzy) { + const int bitwidth_iteration = 10; + const int probe_iteration = 100; + const int total_sequence_length = 2048; + + std::random_device r; + unsigned int seed = r(); + std::default_random_engine random_eng(seed); + + // Generates random number between 'bottom' and 'top' (inclusive intervals). + auto GetRandom = [&random_eng](int bottom, int top) { + std::uniform_int_distribution<int> uni_dist(bottom, top); + return uni_dist(random_eng); + }; + + for (int min_run_length : legal_min_run_lengths_) { + for (int i = 0; i < bitwidth_iteration; ++i) { + int bit_width = GetRandom(1, 32); + int max_run_length = GetRandom(5, 200); + vector<int> seq = MakeRandomSequence(seed, total_sequence_length, max_run_length, + bit_width); + for (int j = 0; j < probe_iteration; ++j) { + int skip_at = GetRandom(0, seq.size() - 1); + int skip_count = GetRandom(1, seq.size() - skip_at); + ValidateRleSkip(seq, bit_width, min_run_length, skip_at, skip_count, seed); + } + } + } +} + TEST_F(RleTest, SpecificSequences) { const int len = 1024; uint8_t expected_buffer[len];