PARQUET-494: Implement DictionaryEncoder and test dictionary decoding I incorporated quite a bit of code from Impala for this patch, but did a bunch of work to get everything working. In particular, I wasn't happy with the hash table implementation in `dict-encoder.h` and so have written a simple new one that we can benchmark and tune as necessary.
The simplest way to pull in the DictEncoder (PARQUET-493) was to also bring in the `MemPool` implementation, suitably trimmed down. We can continue to refactor this as needed for parquet-cpp. I also did some light refactoring using `TYPED_TEST` in `plain-encoding-test` (now `encoding-test`). Author: Wes McKinney <[email protected]> Closes #64 from wesm/PARQUET-494 and squashes the following commits: c634abe [Wes McKinney] Refactor to create TestEncoderBase a3a563a [Wes McKinney] Consolidate dictionary encoding code 2cc4ffe [Wes McKinney] Retrieve type_length() only once in PlainDecoder ctor 20ccd9e [Wes McKinney] Remove DictionaryEncoder shim layer for now dcfc0aa [Wes McKinney] Remove redundant Int96 comparison d98a2c0 [Wes McKinney] Dictionary encoding for booleans throws exception 05414f0 [Wes McKinney] Test dictionary encoding more types 9a5b1a4 [Wes McKinney] Enable include_order linting per PARQUET-539 f3f0efc [Wes McKinney] IWYU cleaning d4191c6 [Wes McKinney] Add header installs, fix clang warning 1347b13 [Wes McKinney] Rename plain-encoding-test to encoding-test 09bf0fa [Wes McKinney] Fix bugs and add dictionary repeats 2e6af48 [Wes McKinney] Fix some bugs. FixedLenByteArray remains to get working. 69b5b69 [Wes McKinney] Refactor test fixtures to be less coupled to state. process on getting dict encoding working 6b23716 [Wes McKinney] Create reusable DataType structs for test fixtures and other compile-time type resolution matters 67883fd [Wes McKinney] Bunch of combined work for dict encoding support: Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6e06929 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6e06929 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6e06929 Branch: refs/heads/master Commit: c6e069297a3b8d0f9ad45da04fe114d40c593115 Parents: 1df5a26 Author: Wes McKinney <[email protected]> Authored: Fri Feb 26 09:52:46 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Fri Feb 26 09:52:46 2016 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- src/parquet/column/column-reader-test.cc | 1 + src/parquet/column/levels-test.cc | 3 +- src/parquet/column/reader.cc | 5 +- src/parquet/column/scanner-test.cc | 54 ++-- src/parquet/compression/codec-test.cc | 5 +- src/parquet/compression/codec.h | 4 +- src/parquet/compression/lz4-codec.cc | 3 +- src/parquet/compression/snappy-codec.cc | 3 +- src/parquet/encodings/CMakeLists.txt | 2 +- src/parquet/encodings/dictionary-encoding.h | 311 +++++++++++++++++++++- src/parquet/encodings/encoder.h | 5 - src/parquet/encodings/encoding-test.cc | 309 +++++++++++++++++++++ src/parquet/encodings/plain-encoding-test.cc | 232 ---------------- src/parquet/encodings/plain-encoding.h | 96 ++++--- src/parquet/file/reader-internal.h | 2 - src/parquet/reader-test.cc | 3 +- src/parquet/schema/schema-descriptor-test.cc | 3 +- src/parquet/types.h | 36 +++ src/parquet/util/CMakeLists.txt | 16 +- src/parquet/util/bit-stream-utils.h | 2 +- src/parquet/util/bit-util-test.cc | 5 +- src/parquet/util/buffer-builder.h | 61 +++++ src/parquet/util/cpu-info.cc | 10 +- src/parquet/util/dict-encoding.h | 36 +++ src/parquet/util/hash-util.h | 247 +++++++++++++++++ src/parquet/util/mem-pool-test.cc | 247 +++++++++++++++++ src/parquet/util/mem-pool.cc | 234 ++++++++++++++++ src/parquet/util/mem-pool.h | 208 +++++++++++++++ src/parquet/util/output.h | 1 - src/parquet/util/rle-encoding.h | 2 +- src/parquet/util/rle-test.cc | 9 +- src/parquet/util/sse-util.h | 30 +++ src/parquet/util/stopwatch.h | 5 +- src/parquet/util/test-common.h | 13 +- 35 files changed, 1840 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 218e74a..5ff9e6c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -267,7 +267,7 @@ if (UNIX) add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py --verbose=2 --linelength=90 - --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/include_order,-runtime/references,-readability/check + --filter=-whitespace/comments,-readability/todo,-build/header_guard,-runtime/references,-readability/check `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/parquet\\/thrift/g'`) endif (UNIX) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 079201a..e64ef28 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -20,6 +20,7 @@ #include <algorithm> #include <cstdint> #include <cstdlib> +#include <limits> #include <memory> #include <string> #include <vector> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/levels-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc index 0e3c20f..57aa562 100644 --- a/src/parquet/column/levels-test.cc +++ b/src/parquet/column/levels-test.cc @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include <gtest/gtest.h> #include <cstdint> #include <memory> #include <vector> #include <string> -#include <gtest/gtest.h> - #include "parquet/column/levels.h" #include "parquet/types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index 4011347..4cff810 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -52,8 +52,9 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) { // // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved - std::shared_ptr<DecoderType> decoder( - new DictionaryDecoder<TYPE>(descr_, &dictionary)); + + auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_); + decoder->SetDict(&dictionary); decoders_[encoding] = decoder; current_decoder_ = decoders_[encoding].get(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index be6b42e..785db08 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -40,16 +40,6 @@ namespace parquet_cpp { using schema::NodePtr; -bool operator==(const Int96& a, const Int96& b) { - return a.value[0] == b.value[0] && - a.value[1] == b.value[1] && - a.value[2] == b.value[2]; -} - -bool operator==(const ByteArray& a, const ByteArray& b) { - return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len); -} - static int FLBA_LENGTH = 12; bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); @@ -57,16 +47,10 @@ bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { namespace test { -template <int N> class TypeValue { - public: - static const int value = N; -}; -template <int N> const int TypeValue<N>::value; - -template <typename TYPE> +template <typename Type> class TestFlatScanner : public ::testing::Test { public: - typedef typename type_traits<TYPE::value>::value_type T; + typedef typename Type::c_type T; void InitValues() { random_numbers(num_values_, 0, std::numeric_limits<T>::min(), @@ -106,7 +90,7 @@ class TestFlatScanner : public ::testing::Test { // Create values values_.resize(num_values_); InitValues(); - Paginate<TYPE::value>(d, values_, def_levels_, max_def_level, + Paginate<Type::type_num>(d, values_, def_levels_, max_def_level, rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_); } @@ -116,8 +100,8 @@ class TestFlatScanner : public ::testing::Test { } void CheckResults(int batch_size, const ColumnDescriptor *d) { - TypedScanner<TYPE::value>* scanner = - reinterpret_cast<TypedScanner<TYPE::value>* >(scanner_.get()); + TypedScanner<Type::type_num>* scanner = + reinterpret_cast<TypedScanner<Type::type_num>* >(scanner_.get()); T val; bool is_null; int16_t def_level; @@ -158,14 +142,11 @@ class TestFlatScanner : public ::testing::Test { void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1, std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3) { NodePtr type; - type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, - static_cast<Type::type>(TYPE::value)); + type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num); d1.reset(new ColumnDescriptor(type, 0, 0)); - type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, - static_cast<Type::type>(TYPE::value)); + type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num); d2.reset(new ColumnDescriptor(type, 4, 0)); - type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, - static_cast<Type::type>(TYPE::value)); + type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num); d3.reset(new ColumnDescriptor(type, 4, 2)); } @@ -194,18 +175,18 @@ class TestFlatScanner : public ::testing::Test { }; template<> -void TestFlatScanner<TypeValue<Type::BOOLEAN> >::InitValues() { +void TestFlatScanner<BooleanType>::InitValues() { values_ = flip_coins(num_values_, 0); } template<> -void TestFlatScanner<TypeValue<Type::INT96> >::InitValues() { +void TestFlatScanner<Int96Type>::InitValues() { random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(), std::numeric_limits<int32_t>::max(), values_.data()); } template<> -void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() { +void TestFlatScanner<ByteArrayType>::InitValues() { int max_byte_array_len = 12; int num_bytes = max_byte_array_len + sizeof(uint32_t); size_t nbytes = num_values_ * num_bytes; @@ -215,7 +196,7 @@ void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() { } template<> -void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() { +void TestFlatScanner<FLBAType>::InitValues() { size_t nbytes = num_values_ * FLBA_LENGTH; data_buffer_.resize(nbytes); random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH, @@ -223,7 +204,7 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() { } template<> -void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors( +void TestFlatScanner<FLBAType>::InitDescriptors( std::shared_ptr<ColumnDescriptor>& d1, std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3) { NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED, @@ -237,18 +218,13 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors( d3.reset(new ColumnDescriptor(type, 4, 2)); } -typedef TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY>> TestFlatFLBAScanner; +typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner; static int num_levels_per_page = 100; static int num_pages = 20; static int batch_size = 32; -typedef ::testing::Types<TypeValue<Type::BOOLEAN>, TypeValue<Type::INT32>, - TypeValue<Type::INT64>, TypeValue<Type::INT96>, TypeValue<Type::FLOAT>, - TypeValue<Type::DOUBLE>, TypeValue<Type::BYTE_ARRAY>, - TypeValue<Type::FIXED_LEN_BYTE_ARRAY> > Primitives; - -TYPED_TEST_CASE(TestFlatScanner, Primitives); +TYPED_TEST_CASE(TestFlatScanner, ParquetTypes); TYPED_TEST(TestFlatScanner, TestScanner) { this->ExecuteAll(num_pages, num_levels_per_page, batch_size); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc index 610fb37..285559a 100644 --- a/src/parquet/compression/codec-test.cc +++ b/src/parquet/compression/codec-test.cc @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include <gtest/gtest.h> #include <cstdint> #include <string> #include <vector> -#include <gtest/gtest.h> -#include "parquet/util/test-common.h" - #include "parquet/compression/codec.h" +#include "parquet/util/test-common.h" using std::string; using std::vector; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h index bc73f02..df15d61 100644 --- a/src/parquet/compression/codec.h +++ b/src/parquet/compression/codec.h @@ -18,11 +18,11 @@ #ifndef PARQUET_COMPRESSION_CODEC_H #define PARQUET_COMPRESSION_CODEC_H +#include <zlib.h> + #include <cstdint> #include <memory> -#include <zlib.h> - #include "parquet/exception.h" #include "parquet/types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/lz4-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc index a131031..81413bb 100644 --- a/src/parquet/compression/lz4-codec.cc +++ b/src/parquet/compression/lz4-codec.cc @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include <lz4.h> #include <cstdint> +#include "parquet/compression/codec.h" #include "parquet/exception.h" namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/snappy-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc index 91590db..991dd04 100644 --- a/src/parquet/compression/snappy-codec.cc +++ b/src/parquet/compression/snappy-codec.cc @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include <snappy.h> #include <cstdint> #include <cstdlib> +#include "parquet/compression/codec.h" #include "parquet/exception.h" namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt index c9349af..eb4cc3c 100644 --- a/src/parquet/encodings/CMakeLists.txt +++ b/src/parquet/encodings/CMakeLists.txt @@ -26,4 +26,4 @@ install(FILES plain-encoding.h DESTINATION include/parquet/encodings) -ADD_PARQUET_TEST(plain-encoding-test) +ADD_PARQUET_TEST(encoding-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index b52aefb..eed0659 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -20,10 +20,16 @@ #include <algorithm> #include <cstdint> +#include <iostream> +#include <limits> #include <vector> #include "parquet/encodings/decoder.h" #include "parquet/encodings/encoder.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/util/dict-encoding.h" +#include "parquet/util/hash-util.h" +#include "parquet/util/mem-pool.h" #include "parquet/util/rle-encoding.h" namespace parquet_cpp { @@ -36,14 +42,12 @@ class DictionaryDecoder : public Decoder<TYPE> { // Initializes the dictionary with values from 'dictionary'. The data in // dictionary is not guaranteed to persist in memory after this call so the // dictionary decoder needs to copy the data out if necessary. - DictionaryDecoder(const ColumnDescriptor* descr, - Decoder<TYPE>* dictionary) + explicit DictionaryDecoder(const ColumnDescriptor* descr) : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) { - Init(dictionary); } // Perform type-specific initiatialization - void Init(Decoder<TYPE>* dictionary); + void SetDict(Decoder<TYPE>* dictionary); virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -83,20 +87,20 @@ class DictionaryDecoder : public Decoder<TYPE> { }; template <int TYPE> -inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) { +inline void DictionaryDecoder<TYPE>::SetDict(Decoder<TYPE>* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); dictionary->Decode(&dictionary_[0], num_dictionary_values); } template <> -inline void DictionaryDecoder<Type::BOOLEAN>::Init( +inline void DictionaryDecoder<Type::BOOLEAN>::SetDict( Decoder<Type::BOOLEAN>* dictionary) { ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); } template <> -inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init( +inline void DictionaryDecoder<Type::BYTE_ARRAY>::SetDict( Decoder<Type::BYTE_ARRAY>* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); @@ -116,7 +120,7 @@ inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init( } template <> -inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init( +inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::SetDict( Decoder<Type::FIXED_LEN_BYTE_ARRAY>* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); @@ -134,6 +138,297 @@ inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init( } } +// ---------------------------------------------------------------------- +// Dictionary encoder + +// Initially imported from Apache Impala on 2016-02-22, and has been modified +// since for parquet-cpp + +// Initially 1024 elements +static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10; + +typedef int32_t hash_slot_t; +static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits<int32_t>::max(); + +// The maximum load factor for the hash table before resizing. +static constexpr double MAX_HASH_LOAD = 0.7; + +/// See the dictionary encoding section of https://github.com/Parquet/parquet-format. +/// The encoding supports streaming encoding. Values are encoded as they are added while +/// the dictionary is being constructed. At any time, the buffered values can be +/// written out with the current dictionary size. More values can then be added to +/// the encoder, including new dictionary entries. +class DictEncoderBase { + public: + virtual ~DictEncoderBase() { + DCHECK(buffered_indices_.empty()); + } + + /// Writes out the encoded dictionary to buffer. buffer must be preallocated to + /// dict_encoded_size() bytes. + virtual void WriteDict(uint8_t* buffer) = 0; + + /// The number of entries in the dictionary. + virtual int num_entries() const = 0; + + /// Clears all the indices (but leaves the dictionary). + void ClearIndices() { buffered_indices_.clear(); } + + /// Returns a conservative estimate of the number of bytes needed to encode the buffered + /// indices. Used to size the buffer passed to WriteIndices(). + int EstimatedDataEncodedSize() { + return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()); + } + + /// The minimum bit width required to encode the currently buffered indices. + int bit_width() const { + if (UNLIKELY(num_entries() == 0)) return 0; + if (UNLIKELY(num_entries() == 1)) return 1; + return BitUtil::Log2(num_entries()); + } + + /// Writes out any buffered indices to buffer preceded by the bit width of this data. + /// Returns the number of bytes written. + /// If the supplied buffer is not big enough, returns -1. + /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() + /// to size buffer. + int WriteIndices(uint8_t* buffer, int buffer_len); + + int hash_table_size() { return hash_table_size_; } + int dict_encoded_size() { return dict_encoded_size_; } + + protected: + explicit DictEncoderBase(MemPool* pool) : + hash_table_size_(INITIAL_HASH_TABLE_SIZE), + mod_bitmask_(hash_table_size_ - 1), + hash_slots_(hash_table_size_, HASH_SLOT_EMPTY), + pool_(pool), + dict_encoded_size_(0) {} + + /// Size of the table. Must be a power of 2. + int hash_table_size_; + + // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j % + // hash_table_size_, but uses far fewer CPU cycles + int mod_bitmask_; + + // We use a fixed-size hash table with linear probing + // + // These values correspond to the uniques_ array + std::vector<hash_slot_t> hash_slots_; + + // For ByteArray / FixedLenByteArray data. Not owned + MemPool* pool_; + + /// Indices that have not yet be written out by WriteIndices(). + std::vector<int> buffered_indices_; + + /// The number of bytes needed to encode the dictionary. + int dict_encoded_size_; +}; + +template <typename T> +class DictEncoder : public DictEncoderBase { + public: + explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) : + DictEncoderBase(pool), + type_length_(type_length) { } + + // TODO(wesm): think about how to address the construction semantics in + // encodings/dictionary-encoding.h + void set_mem_pool(MemPool* pool) { + pool_ = pool; + } + + void set_type_length(int type_length) { + type_length_ = type_length; + } + + /// Encode value. Note that this does not actually write any data, just + /// buffers the value's index to be written later. + void Put(const T& value); + + virtual void WriteDict(uint8_t* buffer); + + virtual int num_entries() const { return uniques_.size(); } + + private: + // The unique observed values + std::vector<T> uniques_; + + bool SlotDifferent(const T& v, hash_slot_t slot); + void DoubleTableSize(); + + /// Size of each encoded dictionary value. -1 for variable-length types. + int type_length_; + + /// Hash function for mapping a value to a bucket. + inline uint32_t Hash(const T& value) const; + + /// Adds value to the hash table and updates dict_encoded_size_ + void AddDictKey(const T& value); +}; + +template<typename T> +inline uint32_t DictEncoder<T>::Hash(const T& value) const { + return HashUtil::Hash(&value, sizeof(value), 0); +} + +template<> +inline uint32_t DictEncoder<ByteArray>::Hash(const ByteArray& value) const { + return HashUtil::Hash(value.ptr, value.len, 0); +} + +template<> +inline uint32_t DictEncoder<FixedLenByteArray>::Hash( + const FixedLenByteArray& value) const { + return HashUtil::Hash(value.ptr, type_length_, 0); +} + +template <typename T> +inline bool DictEncoder<T>::SlotDifferent(const T& v, hash_slot_t slot) { + return v != uniques_[slot]; +} + +template <> +inline bool DictEncoder<FixedLenByteArray>::SlotDifferent( + const FixedLenByteArray& v, hash_slot_t slot) { + return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_); +} + +template <typename T> +inline void DictEncoder<T>::Put(const T& v) { + uint32_t j = Hash(v) & mod_bitmask_; + hash_slot_t index = hash_slots_[j]; + + // Find an empty slot + while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) { + // Linear probing + ++j; + if (j == hash_table_size_) j = 0; + index = hash_slots_[j]; + } + + int bytes_added = 0; + if (index == HASH_SLOT_EMPTY) { + // Not in the hash table, so we insert it now + index = uniques_.size(); + hash_slots_[j] = index; + AddDictKey(v); + + if (UNLIKELY(uniques_.size() > + static_cast<size_t>(hash_table_size_ * MAX_HASH_LOAD))) { + DoubleTableSize(); + } + } + + buffered_indices_.push_back(index); +} + +template <typename T> +inline void DictEncoder<T>::DoubleTableSize() { + int new_size = hash_table_size_ * 2; + std::vector<hash_slot_t> new_hash_slots(new_size, HASH_SLOT_EMPTY); + hash_slot_t index, slot; + uint32_t j; + for (int i = 0; i < hash_table_size_; ++i) { + index = hash_slots_[i]; + + if (index == HASH_SLOT_EMPTY) { + continue; + } + + // Compute the hash value mod the new table size to start looking for an + // empty slot + const T& v = uniques_[index]; + + // Find an empty slot in the new hash table + j = Hash(v) & (new_size - 1); + slot = new_hash_slots[j]; + while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) { + ++j; + if (j == new_size) j = 0; + slot = new_hash_slots[j]; + } + + // Copy the old slot index to the new hash table + new_hash_slots[j] = index; + } + + hash_table_size_ = new_size; + mod_bitmask_ = new_size - 1; + new_hash_slots.swap(hash_slots_); +} + +template<typename T> +inline void DictEncoder<T>::AddDictKey(const T& v) { + uniques_.push_back(v); + dict_encoded_size_ += sizeof(T); +} + +template<> +inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) { + uint8_t* heap = pool_->Allocate(v.len); + if (UNLIKELY(v.len > 0 && heap == nullptr)) { + throw ParquetException("out of memory"); + } + memcpy(heap, v.ptr, v.len); + + uniques_.push_back(ByteArray(v.len, heap)); + dict_encoded_size_ += v.len + sizeof(uint32_t); +} + +template<> +inline void DictEncoder<FixedLenByteArray>::AddDictKey(const FixedLenByteArray& v) { + uint8_t* heap = pool_->Allocate(type_length_); + if (UNLIKELY(type_length_ > 0 && heap == nullptr)) { + throw ParquetException("out of memory"); + } + memcpy(heap, v.ptr, type_length_); + + uniques_.push_back(FixedLenByteArray(heap)); + dict_encoded_size_ += type_length_; +} + +template <typename T> +inline void DictEncoder<T>::WriteDict(uint8_t* buffer) { + // For primitive types, only a memcpy + memcpy(buffer, &uniques_[0], sizeof(T) * uniques_.size()); +} + +// ByteArray and FLBA already have the dictionary encoded in their data heaps +template <> +inline void DictEncoder<ByteArray>::WriteDict(uint8_t* buffer) { + for (const ByteArray& v : uniques_) { + memcpy(buffer, reinterpret_cast<const void*>(&v.len), sizeof(uint32_t)); + buffer += sizeof(uint32_t); + memcpy(buffer, v.ptr, v.len); + buffer += v.len; + } +} + +template <> +inline void DictEncoder<FixedLenByteArray>::WriteDict(uint8_t* buffer) { + for (const FixedLenByteArray& v : uniques_) { + memcpy(buffer, v.ptr, type_length_); + buffer += type_length_; + } +} + +inline int DictEncoderBase::WriteIndices(uint8_t* buffer, int buffer_len) { + // Write bit width in first byte + *buffer = bit_width(); + ++buffer; + --buffer_len; + + RleEncoder encoder(buffer, buffer_len, bit_width()); + for (int index : buffered_indices_) { + if (!encoder.Put(index)) return -1; + } + encoder.Flush(); + return 1 + encoder.len(); +} + } // namespace parquet_cpp #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h index 50ba48f..ce91a29 100644 --- a/src/parquet/encodings/encoder.h +++ b/src/parquet/encodings/encoder.h @@ -39,11 +39,6 @@ class Encoder { virtual ~Encoder() {} - // Subclasses should override the ones they support - virtual void Encode(const T* src, int num_values, OutputStream* dst) { - throw ParquetException("Encoder does not implement this type."); - } - const Encoding::type encoding() const { return encoding_; } protected: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc new file mode 100644 index 0000000..10310ed --- /dev/null +++ b/src/parquet/encodings/encoding-test.cc @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <string> +#include <vector> + +#include "parquet/schema/descriptor.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/types.h" +#include "parquet/schema/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/buffer.h" +#include "parquet/util/dict-encoding.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; + +namespace parquet_cpp { + +namespace test { + +TEST(VectorBooleanTest, TestEncodeDecode) { + // PARQUET-454 + int nvalues = 10000; + int nbytes = BitUtil::Ceil(nvalues, 8); + + // seed the prng so failure is deterministic + vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0); + + PlainEncoder<Type::BOOLEAN> encoder(nullptr); + PlainDecoder<Type::BOOLEAN> decoder(nullptr); + + InMemoryOutputStream dst; + encoder.Encode(draws, nvalues, &dst); + + std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer(); + ASSERT_EQ(nbytes, encode_buffer->size()); + + vector<uint8_t> decode_buffer(nbytes); + const uint8_t* decode_data = &decode_buffer[0]; + + decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size()); + int values_decoded = decoder.Decode(&decode_buffer[0], nvalues); + ASSERT_EQ(nvalues, values_decoded); + + for (int i = 0; i < nvalues; ++i) { + ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i; + } +} + +// ---------------------------------------------------------------------- +// test data generation + +template <typename T> +void GenerateData(int num_values, T* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_numbers(num_values, 0, std::numeric_limits<T>::min(), + std::numeric_limits<T>::max(), out); +} + +template <> +void GenerateData<bool>(int num_values, bool* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_bools(num_values, 0.5, 0, out); +} + +template <> +void GenerateData<Int96>(int num_values, Int96* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(), + std::numeric_limits<int32_t>::max(), out); +} + +template <> +void GenerateData<ByteArray>(int num_values, ByteArray* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + int max_byte_array_len = 12; + int num_bytes = max_byte_array_len + sizeof(uint32_t); + heap->resize(num_values * max_byte_array_len); + random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len); +} + +static int flba_length = 8; + +template <> +void GenerateData<FLBA>(int num_values, FLBA* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + heap->resize(num_values * flba_length); + random_fixed_byte_array(num_values, 0, heap->data(), flba_length, out); +} + +template <typename T> +void VerifyResults(T* result, T* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(expected[i], result[i]) << i; + } +} + +template <> +void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, flba_length)) << i; + } +} + +// ---------------------------------------------------------------------- +// Create some column descriptors + +template <typename T> +std::shared_ptr<ColumnDescriptor> ExampleDescr() { + return nullptr; +} + +template <> +std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBA>() { + auto node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, + flba_length, LogicalType::UTF8); + return std::make_shared<ColumnDescriptor>(node, 0, 0); +} + +// ---------------------------------------------------------------------- +// Plain encoding tests + +template <typename Type> +class TestEncodingBase : public ::testing::Test { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void SetUp() { + descr_ = ExampleDescr<T>(); + if (descr_) { + type_length_ = descr_->type_length(); + } + } + + void InitData(int nvalues, int repeats) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(T)); + output_bytes_.resize(num_values_ * sizeof(T)); + draws_ = reinterpret_cast<T*>(input_bytes_.data()); + decode_buf_ = reinterpret_cast<T*>(output_bytes_.data()); + GenerateData<T>(nvalues, draws_, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + virtual void CheckRoundtrip() = 0; + + void Execute(int nvalues, int repeats) { + InitData(nvalues, repeats); + CheckRoundtrip(); + } + + protected: + MemPool pool_; + + int num_values_; + int type_length_; + T* draws_; + T* decode_buf_; + vector<uint8_t> input_bytes_; + vector<uint8_t> output_bytes_; + vector<uint8_t> data_buffer_; + + std::shared_ptr<Buffer> encode_buffer_; + std::shared_ptr<ColumnDescriptor> descr_; +}; + +// Member variables are not visible to templated subclasses. Possibly figure +// out an alternative to this class layering at some point +#define USING_BASE_MEMBERS() \ + using TestEncodingBase<Type>::pool_; \ + using TestEncodingBase<Type>::descr_; \ + using TestEncodingBase<Type>::num_values_; \ + using TestEncodingBase<Type>::draws_; \ + using TestEncodingBase<Type>::data_buffer_; \ + using TestEncodingBase<Type>::type_length_; \ + using TestEncodingBase<Type>::encode_buffer_; \ + using TestEncodingBase<Type>::decode_buf_; + + +template <typename Type> +class TestPlainEncoding : public TestEncodingBase<Type> { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + PlainEncoder<TYPE> encoder(descr_.get()); + PlainDecoder<TYPE> decoder(descr_.get()); + InMemoryOutputStream dst; + encoder.Encode(draws_, num_values_, &dst); + + encode_buffer_ = dst.GetBuffer(); + + decoder.SetData(num_values_, encode_buffer_->data(), + encode_buffer_->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + VerifyResults<T>(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); +}; + +TYPED_TEST_CASE(TestPlainEncoding, ParquetTypes); + +TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { + this->Execute(10000, 1); +} + +// ---------------------------------------------------------------------- +// Dictionary encoding tests + +typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, + ByteArrayType, FLBAType> DictEncodedTypes; + +template <typename Type> +class TestDictionaryEncoding : public TestEncodingBase<Type> { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void CheckRoundtrip() { + DictEncoder<T> encoder(&pool_, type_length_); + + dict_buffer_ = std::make_shared<OwnedMutableBuffer>(); + auto indices = std::make_shared<OwnedMutableBuffer>(); + + ASSERT_NO_THROW( + { + for (int i = 0; i < num_values_; ++i) { + encoder.Put(draws_[i]); + } + }); + dict_buffer_->Resize(encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer_->mutable_data()); + + indices->Resize(encoder.EstimatedDataEncodedSize()); + int actual_bytes = encoder.WriteIndices(indices->mutable_data(), + indices->size()); + indices->Resize(actual_bytes); + + PlainDecoder<TYPE> dict_decoder(descr_.get()); + dict_decoder.SetData(encoder.num_entries(), dict_buffer_->data(), + dict_buffer_->size()); + + DictionaryDecoder<TYPE> decoder(descr_.get()); + decoder.SetDict(&dict_decoder); + + decoder.SetData(num_values_, indices->data(), indices->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + + // TODO(wesm): The DictionaryDecoder must stay alive because the decoded + // values' data is owned by a buffer inside the DictionaryEncoder. We + // should revisit when data lifetime is reviewed more generally. + VerifyResults<T>(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); + std::shared_ptr<OwnedMutableBuffer> dict_buffer_; +}; + +TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes); + +TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) { + this->Execute(2500, 2); +} + +TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { + PlainDecoder<Type::BOOLEAN> dict_decoder(nullptr); + DictionaryDecoder<Type::BOOLEAN> decoder(nullptr); + + ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException); +} + +} // namespace test + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc deleted file mode 100644 index 7ebd21f..0000000 --- a/src/parquet/encodings/plain-encoding-test.cc +++ /dev/null @@ -1,232 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <cstdlib> -#include <cstring> -#include <string> -#include <vector> - -#include <gtest/gtest.h> - -#include "parquet/schema/descriptor.h" -#include "parquet/encodings/plain-encoding.h" -#include "parquet/types.h" -#include "parquet/schema/types.h" -#include "parquet/util/bit-util.h" -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" -#include "parquet/util/test-common.h" - -using std::string; -using std::vector; - -namespace parquet_cpp { - -namespace test { - -TEST(VectorBooleanTest, TestEncodeDecode) { - // PARQUET-454 - size_t nvalues = 10000; - size_t nbytes = BitUtil::Ceil(nvalues, 8); - - // seed the prng so failure is deterministic - vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0); - - PlainEncoder<Type::BOOLEAN> encoder(nullptr); - PlainDecoder<Type::BOOLEAN> decoder(nullptr); - - InMemoryOutputStream dst; - encoder.Encode(draws, nvalues, &dst); - - std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer(); - ASSERT_EQ(nbytes, encode_buffer->size()); - - vector<uint8_t> decode_buffer(nbytes); - const uint8_t* decode_data = &decode_buffer[0]; - - decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size()); - size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues); - ASSERT_EQ(nvalues, values_decoded); - - for (size_t i = 0; i < nvalues; ++i) { - ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i; - } -} - -template<typename T, int TYPE> -class EncodeDecode{ - public: - void init_data(int nvalues) { - num_values_ = nvalues; - input_bytes_.resize(num_values_ * sizeof(T)); - output_bytes_.resize(num_values_ * sizeof(T)); - draws_ = reinterpret_cast<T*>(input_bytes_.data()); - decode_buf_ = reinterpret_cast<T*>(output_bytes_.data()); - } - - void generate_data() { - // seed the prng so failure is deterministic - random_numbers(num_values_, 0, std::numeric_limits<T>::min(), - std::numeric_limits<T>::max(), draws_); - } - - void encode_decode(ColumnDescriptor *d) { - PlainEncoder<TYPE> encoder(d); - PlainDecoder<TYPE> decoder(d); - - InMemoryOutputStream dst; - encoder.Encode(draws_, num_values_, &dst); - - encode_buffer_ = dst.GetBuffer(); - - decoder.SetData(num_values_, encode_buffer_->data(), - encode_buffer_->size()); - size_t values_decoded = decoder.Decode(decode_buf_, num_values_); - ASSERT_EQ(num_values_, values_decoded); - } - - void verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i], decode_buf_[i]) << i; - } - } - - void execute(int nvalues, ColumnDescriptor *d) { - init_data(nvalues); - generate_data(); - encode_decode(d); - verify_results(); - } - - private: - int num_values_; - T* draws_; - T* decode_buf_; - vector<uint8_t> input_bytes_; - vector<uint8_t> output_bytes_; - vector<uint8_t> data_buffer_; - - std::shared_ptr<Buffer> encode_buffer_; -}; - -template<> -void EncodeDecode<bool, Type::BOOLEAN>::generate_data() { - // seed the prng so failure is deterministic - random_bools(num_values_, 0.5, 0, draws_); -} - -template<> -void EncodeDecode<Int96, Type::INT96>::generate_data() { - // seed the prng so failure is deterministic - random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(), - std::numeric_limits<int32_t>::max(), draws_); -} - -template<> -void EncodeDecode<Int96, Type::INT96>::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i; - ASSERT_EQ(draws_[i].value[1], decode_buf_[i].value[1]) << i; - ASSERT_EQ(draws_[i].value[2], decode_buf_[i].value[2]) << i; - } -} - -template<> -void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() { - // seed the prng so failure is deterministic - int max_byte_array_len = 12; - int num_bytes = max_byte_array_len + sizeof(uint32_t); - size_t nbytes = num_values_ * num_bytes; - data_buffer_.resize(nbytes); - random_byte_array(num_values_, 0, data_buffer_.data(), draws_, - max_byte_array_len); -} - -template<> -void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i].len, decode_buf_[i].len) << i; - ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, draws_[i].len)) << i; - } -} - -static int flba_length = 8; -template<> -void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() { - // seed the prng so failure is deterministic - size_t nbytes = num_values_ * flba_length; - data_buffer_.resize(nbytes); - ASSERT_EQ(nbytes, data_buffer_.size()); - random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_); -} - -template<> -void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i; - } -} - -int num_values = 10000; - -TEST(BoolEncodeDecode, TestEncodeDecode) { - EncodeDecode<bool, Type::BOOLEAN> obj; - obj.execute(num_values, nullptr); -} - -TEST(Int32EncodeDecode, TestEncodeDecode) { - EncodeDecode<int32_t, Type::INT32> obj; - obj.execute(num_values, nullptr); -} - -TEST(Int64EncodeDecode, TestEncodeDecode) { - EncodeDecode<int64_t, Type::INT64> obj; - obj.execute(num_values, nullptr); -} - -TEST(FloatEncodeDecode, TestEncodeDecode) { - EncodeDecode<float, Type::FLOAT> obj; - obj.execute(num_values, nullptr); -} - -TEST(DoubleEncodeDecode, TestEncodeDecode) { - EncodeDecode<double, Type::DOUBLE> obj; - obj.execute(num_values, nullptr); -} - -TEST(Int96EncodeDecode, TestEncodeDecode) { - EncodeDecode<Int96, Type::INT96> obj; - obj.execute(num_values, nullptr); -} - -TEST(BAEncodeDecode, TestEncodeDecode) { - EncodeDecode<ByteArray, Type::BYTE_ARRAY> obj; - obj.execute(num_values, nullptr); -} - -TEST(FLBAEncodeDecode, TestEncodeDecode) { - schema::NodePtr node; - node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, - flba_length, LogicalType::UTF8); - ColumnDescriptor d(node, 0, 0); - EncodeDecode<FixedLenByteArray, Type::FIXED_LEN_BYTE_ARRAY> obj; - obj.execute(num_values, &d); -} - -} // namespace test -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index 83ee40c..9adabdf 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -40,7 +40,13 @@ class PlainDecoder : public Decoder<TYPE> { explicit PlainDecoder(const ColumnDescriptor* descr) : Decoder<TYPE>(descr, Encoding::PLAIN), - data_(NULL), len_(0) {} + data_(NULL), len_(0) { + if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { + type_length_ = descr_->type_length(); + } else { + type_length_ = -1; + } + } virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -49,55 +55,69 @@ class PlainDecoder : public Decoder<TYPE> { } virtual int Decode(T* buffer, int max_values); + private: + using Decoder<TYPE>::descr_; const uint8_t* data_; int len_; + int type_length_; }; -template <int TYPE> -inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int size = max_values * sizeof(T); - if (len_ < size) ParquetException::EofException(); - memcpy(buffer, data_, size); - data_ += size; - len_ -= size; - num_values_ -= max_values; - return max_values; +// Decode routine templated on C++ type rather than type enum +template <typename T> +inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, T* out) { + int bytes_to_decode = num_values * sizeof(T); + if (data_size < bytes_to_decode) { + ParquetException::EofException(); + } + memcpy(out, data, bytes_to_decode); + return bytes_to_decode; } -// Template specialization for BYTE_ARRAY -// BA does not currently own its data -// the lifetime is tied to the input stream +// Template specialization for BYTE_ARRAY. The written values do not own their +// own data. template <> -inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer, - int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - uint32_t len = buffer[i].len = *reinterpret_cast<const uint32_t*>(data_); - if (len_ < sizeof(uint32_t) + len) ParquetException::EofException(); - buffer[i].ptr = data_ + sizeof(uint32_t); - data_ += sizeof(uint32_t) + len; - len_ -= sizeof(uint32_t) + len; +inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values, + int type_length, ByteArray* out) { + int bytes_decoded = 0; + int increment; + for (int i = 0; i < num_values; ++i) { + uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data); + increment = sizeof(uint32_t) + len; + if (data_size < increment) ParquetException::EofException(); + out[i].ptr = data + sizeof(uint32_t); + data += increment; + data_size -= increment; + bytes_decoded += increment; } - num_values_ -= max_values; - return max_values; + return bytes_decoded; } -// Template specialization for FIXED_LEN_BYTE_ARRAY -// FLBA does not currently own its data -// the lifetime is tied to the input stream +// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not +// own their own data. template <> -inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode( - FixedLenByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int len = descr_->type_length(); - for (int i = 0; i < max_values; ++i) { - if (len_ < len) ParquetException::EofException(); - buffer[i].ptr = data_; - data_ += len; - len_ -= len; +inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size, + int num_values, int type_length, FixedLenByteArray* out) { + int bytes_to_decode = type_length * num_values; + if (data_size < bytes_to_decode) { + ParquetException::EofException(); } + for (int i = 0; i < num_values; ++i) { + out[i].ptr = data; + data += type_length; + data_size -= type_length; + } + return bytes_to_decode; +} + +template <int TYPE> +inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int bytes_consumed = DecodePlain<T>(data_, len_, max_values, + type_length_, buffer); + data_ += bytes_consumed; + len_ -= bytes_consumed; num_values_ -= max_values; return max_values; } @@ -155,7 +175,7 @@ class PlainEncoder : public Encoder<TYPE> { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder<TYPE>(descr, Encoding::PLAIN) {} - virtual void Encode(const T* src, int num_values, OutputStream* dst); + void Encode(const T* src, int num_values, OutputStream* dst); }; template <> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 7aff74a..08d4607 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -31,8 +31,6 @@ namespace parquet_cpp { -class SchemaDescriptor; - // 16 MB is the default maximum page header size static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index be22e5a..e99140c 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include <gtest/gtest.h> #include <cstdlib> #include <cstdint> #include <iostream> #include <memory> #include <string> -#include <gtest/gtest.h> - #include "parquet/file/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/schema/schema-descriptor-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index 7677615..3b1734b 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -17,13 +17,12 @@ // Schema / column descriptor correctness tests (from flat Parquet schemas) +#include <gtest/gtest.h> #include <cstdint> #include <cstdlib> #include <string> #include <vector> -#include <gtest/gtest.h> - #include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/types.h ---------------------------------------------------------------------- diff --git a/src/parquet/types.h b/src/parquet/types.h index 8c5e123..f59f6a9 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -128,11 +128,24 @@ struct PageType { // ---------------------------------------------------------------------- struct ByteArray { + ByteArray() {} + ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} uint32_t len; const uint8_t* ptr; + + bool operator==(const ByteArray& other) const { + return this->len == other.len && + 0 == memcmp(this->ptr, other.ptr, this->len); + } + + bool operator!=(const ByteArray& other) const { + return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len); + } }; struct FixedLenByteArray { + FixedLenByteArray() {} + explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {} const uint8_t* ptr; }; @@ -140,6 +153,14 @@ typedef FixedLenByteArray FLBA; MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; + + bool operator==(const Int96& other) const { + return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t)); + } + + bool operator!=(const Int96& other) const { + return !(*this == other); + } }; STRUCT_END(Int96, 12); @@ -241,6 +262,21 @@ struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> { static constexpr const char* printf_code = "s"; }; +template <Type::type TYPE> +struct DataType { + static constexpr Type::type type_num = TYPE; + typedef typename type_traits<TYPE>::value_type c_type; +}; + +typedef DataType<Type::BOOLEAN> BooleanType; +typedef DataType<Type::INT32> Int32Type; +typedef DataType<Type::INT64> Int64Type; +typedef DataType<Type::INT96> Int96Type; +typedef DataType<Type::FLOAT> FloatType; +typedef DataType<Type::DOUBLE> DoubleType; +typedef DataType<Type::BYTE_ARRAY> ByteArrayType; +typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType; + template <int TYPE> inline std::string format_fwf(int width) { std::stringstream ss; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index 2b782fc..a009129 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -20,22 +20,27 @@ install(FILES bit-stream-utils.h bit-stream-utils.inline.h bit-util.h - cpu-info.h - sse-info.h + buffer-builder.h compiler-util.h + cpu-info.h + dict-encoding.h + hash-util.h + input.h logging.h macros.h + mem-pool.h + output.h rle-encoding.h stopwatch.h - input.h - output.h + sse-info.h DESTINATION include/parquet/util) add_library(parquet_util STATIC buffer.cc + cpu-info.cc input.cc + mem-pool.cc output.cc - cpu-info.cc ) if(PARQUET_BUILD_TESTS) @@ -58,5 +63,6 @@ endif() ADD_PARQUET_TEST(bit-util-test) ADD_PARQUET_TEST(buffer-test) +ADD_PARQUET_TEST(mem-pool-test) ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h index b93b90e..3636128 100644 --- a/src/parquet/util/bit-stream-utils.h +++ b/src/parquet/util/bit-stream-utils.h @@ -20,9 +20,9 @@ #ifndef PARQUET_UTIL_BIT_STREAM_UTILS_H #define PARQUET_UTIL_BIT_STREAM_UTILS_H +#include <string.h> #include <algorithm> #include <cstdint> -#include <string.h> #include "parquet/util/compiler-util.h" #include "parquet/util/logging.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-util-test.cc b/src/parquet/util/bit-util-test.cc index a8b6be0..5ea4c11 100644 --- a/src/parquet/util/bit-util-test.cc +++ b/src/parquet/util/bit-util-test.cc @@ -19,11 +19,12 @@ #include <stdlib.h> #include <stdio.h> -#include <iostream> #include <limits.h> +#include <gtest/gtest.h> #include <boost/utility.hpp> -#include <gtest/gtest.h> + +#include <iostream> #include "parquet/util/bit-util.h" #include "parquet/util/bit-stream-utils.inline.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/buffer-builder.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/buffer-builder.h b/src/parquet/util/buffer-builder.h new file mode 100644 index 0000000..6fab6c5 --- /dev/null +++ b/src/parquet/util/buffer-builder.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Impala on 2016-02-23 + +#ifndef PARQUET_UTIL_BUFFER_BUILDER_H +#define PARQUET_UTIL_BUFFER_BUILDER_H + +#include <stdlib.h> +#include <cstdint> + +namespace parquet_cpp { + +/// Utility class to build an in-memory buffer. +class BufferBuilder { + public: + BufferBuilder(uint8_t* dst_buffer, int dst_len) + : buffer_(dst_buffer), capacity_(dst_len), size_(0) { + } + + BufferBuilder(char* dst_buffer, int dst_len) + : buffer_(reinterpret_cast<uint8_t*>(dst_buffer)), + capacity_(dst_len), size_(0) { + } + + inline void Append(const void* buffer, int len) { + memcpy(buffer_ + size_, buffer, len); + size_ += len; + } + + template<typename T> + inline void Append(const T& v) { + Append(&v, sizeof(T)); + } + + int capacity() const { return capacity_; } + int size() const { return size_; } + + private: + uint8_t* buffer_; + int capacity_; + int size_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_BUFFER_BUILDER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/cpu-info.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/cpu-info.cc b/src/parquet/util/cpu-info.cc index 610fb62..2a9f59d 100644 --- a/src/parquet/util/cpu-info.cc +++ b/src/parquet/util/cpu-info.cc @@ -24,16 +24,18 @@ #include <sys/sysctl.h> #endif +#include <mmintrin.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + #include <boost/algorithm/string.hpp> + #include <algorithm> #include <cstdint> #include <iostream> #include <fstream> -#include <mmintrin.h> #include <sstream> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> #include <string> #include "parquet/exception.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/dict-encoding.h b/src/parquet/util/dict-encoding.h new file mode 100644 index 0000000..315b88e --- /dev/null +++ b/src/parquet/util/dict-encoding.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_DICT_ENCODING_H +#define PARQUET_UTIL_DICT_ENCODING_H + +#include <algorithm> +#include <cstdint> +#include <limits> +#include <vector> + +#include "parquet/types.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/util/hash-util.h" +#include "parquet/util/mem-pool.h" +#include "parquet/util/rle-encoding.h" + +namespace parquet_cpp { + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_DICT_ENCODING_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/hash-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/hash-util.h b/src/parquet/util/hash-util.h new file mode 100644 index 0000000..5572ca9 --- /dev/null +++ b/src/parquet/util/hash-util.h @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// From Apache Impala as of 2016-02-22 + +#ifndef PARQUET_UTIL_HASH_UTIL_H +#define PARQUET_UTIL_HASH_UTIL_H + +#include <cstdint> + +#include "parquet/util/compiler-util.h" +#include "parquet/util/cpu-info.h" +#include "parquet/util/logging.h" +#include "parquet/util/sse-util.h" + +namespace parquet_cpp { + +/// Utility class to compute hash values. +class HashUtil { + public: + /// Compute the Crc32 hash for data using SSE4 instructions. The input hash + /// parameter is the current hash/seed value. + /// This should only be called if SSE is supported. + /// This is ~4x faster than Fnv/Boost Hash. + /// TODO: crc32 hashes with different seeds do not result in different hash functions. + /// The resulting hashes are correlated. + /// TODO: update this to also use SSE4_crc32_u64 and SSE4_crc32_u16 where appropriate. + static uint32_t CrcHash(const void* data, int32_t bytes, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + uint32_t words = bytes / sizeof(uint32_t); + bytes = bytes % sizeof(uint32_t); + + const uint32_t* p = reinterpret_cast<const uint32_t*>(data); + while (words--) { + hash = SSE4_crc32_u32(hash, *p); + ++p; + } + + const uint8_t* s = reinterpret_cast<const uint8_t*>(p); + while (bytes--) { + hash = SSE4_crc32_u8(hash, *s); + ++s; + } + + // The lower half of the CRC hash has has poor uniformity, so swap the halves + // for anyone who only uses the first several bits of the hash. + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 1-byte data + static inline uint32_t CrcHash1(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint8_t* s = reinterpret_cast<const uint8_t*>(v); + hash = SSE4_crc32_u8(hash, *s); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 2-byte data + static inline uint32_t CrcHash2(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint16_t* s = reinterpret_cast<const uint16_t*>(v); + hash = SSE4_crc32_u16(hash, *s); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 4-byte data + static inline uint32_t CrcHash4(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint32_t* p = reinterpret_cast<const uint32_t*>(v); + hash = SSE4_crc32_u32(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 8-byte data + static inline uint32_t CrcHash8(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast<const uint64_t*>(v); + hash = SSE4_crc32_u64(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 12-byte data + static inline uint32_t CrcHash12(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast<const uint64_t*>(v); + hash = SSE4_crc32_u64(hash, *p); + ++p; + hash = SSE4_crc32_u32(hash, *reinterpret_cast<const uint32_t *>(p)); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 16-byte data + static inline uint32_t CrcHash16(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast<const uint64_t*>(v); + hash = SSE4_crc32_u64(hash, *p); + ++p; + hash = SSE4_crc32_u64(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995; + static const int MURMUR_R = 47; + + /// Murmur2 hash implementation returning 64-bit hashes. + static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) { + uint64_t h = seed ^ (len * MURMUR_PRIME); + + const uint64_t* data = reinterpret_cast<const uint64_t*>(input); + const uint64_t* end = data + (len / sizeof(uint64_t)); + + while (data != end) { + uint64_t k = *data++; + k *= MURMUR_PRIME; + k ^= k >> MURMUR_R; + k *= MURMUR_PRIME; + h ^= k; + h *= MURMUR_PRIME; + } + + const uint8_t* data2 = reinterpret_cast<const uint8_t*>(data); + switch (len & 7) { + case 7: h ^= uint64_t(data2[6]) << 48; + case 6: h ^= uint64_t(data2[5]) << 40; + case 5: h ^= uint64_t(data2[4]) << 32; + case 4: h ^= uint64_t(data2[3]) << 24; + case 3: h ^= uint64_t(data2[2]) << 16; + case 2: h ^= uint64_t(data2[1]) << 8; + case 1: h ^= uint64_t(data2[0]); + h *= MURMUR_PRIME; + } + + h ^= h >> MURMUR_R; + h *= MURMUR_PRIME; + h ^= h >> MURMUR_R; + return h; + } + + /// default values recommended by http://isthe.com/chongo/tech/comp/fnv/ + static const uint32_t FNV_PRIME = 0x01000193; // 16777619 + static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261 + static const uint64_t FNV64_PRIME = 1099511628211UL; + static const uint64_t FNV64_SEED = 14695981039346656037UL; + + /// Implementation of the Fowler-Noll-Vo hash function. This is not as performant + /// as boost's hash on int types (2x slower) but has bit entropy. + /// For ints, boost just returns the value of the int which can be pathological. + /// For example, if the data is <1000, 2000, 3000, 4000, ..> and then the mod of 1000 + /// is taken on the hash, all values will collide to the same bucket. + /// For string values, Fnv is slightly faster than boost. + /// IMPORTANT: FNV hash suffers from poor diffusion of the least significant bit, + /// which can lead to poor results when input bytes are duplicated. + /// See FnvHash64to32() for how this can be mitigated. + static uint64_t FnvHash64(const void* data, int32_t bytes, uint64_t hash) { + const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data); + while (bytes--) { + hash = (*ptr ^ hash) * FNV64_PRIME; + ++ptr; + } + return hash; + } + + /// Return a 32-bit hash computed by invoking FNV-64 and folding the result to 32-bits. + /// This technique is recommended instead of FNV-32 since the LSB of an FNV hash is the + /// XOR of the LSBs of its input bytes, leading to poor results for duplicate inputs. + /// The input seed 'hash' is duplicated so the top half of the seed is not all zero. + /// Data length must be at least 1 byte: zero-length data should be handled separately, + /// for example using CombineHash with a unique constant value to avoid returning the + /// hash argument. Zero-length data gives terrible results: the initial hash value is + /// xored with itself cancelling all bits. + static uint32_t FnvHash64to32(const void* data, int32_t bytes, uint32_t hash) { + // IMPALA-2270: this function should never be used for zero-byte inputs. + DCHECK_GT(bytes, 0); + uint64_t hash_u64 = hash | ((uint64_t)hash << 32); + hash_u64 = FnvHash64(data, bytes, hash_u64); + return (hash_u64 >> 32) ^ (hash_u64 & 0xFFFFFFFF); + } + + /// Computes the hash value for data. Will call either CrcHash or MurmurHash + /// depending on hardware capabilities. + /// Seed values for different steps of the query execution should use different seeds + /// to prevent accidental key collisions. (See IMPALA-219 for more details). + static uint32_t Hash(const void* data, int32_t bytes, uint32_t seed) { + if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) { + return CrcHash(data, bytes, seed); + } else { + return MurmurHash2_64(data, bytes, seed); + } + } + + /// The magic number (used in hash_combine()) 0x9e3779b9 = 2^32 / (golden ratio). + static const uint32_t HASH_COMBINE_SEED = 0x9e3779b9; + + /// Combine hashes 'value' and 'seed' to get a new hash value. Similar to + /// boost::hash_combine(), but for uint32_t. This function should be used with a + /// constant first argument to update the hash value for zero-length values such as + /// NULL, boolean, and empty strings. + static inline uint32_t HashCombine32(uint32_t value, uint32_t seed) { + return seed ^ (HASH_COMBINE_SEED + value + (seed << 6) + (seed >> 2)); + } + + // Get 32 more bits of randomness from a 32-bit hash: + static inline uint32_t Rehash32to32(const uint32_t hash) { + // Constants generated by uuidgen(1) with the -r flag + static const uint64_t m = 0x7850f11ec6d14889ull, a = 0x6773610597ca4c63ull; + // This is strongly universal hashing following Dietzfelbinger's "Universal hashing + // and k-wise independent random variables via integer arithmetic without primes". As + // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the + // randomness of the constants) that any subset of bit positions of + // Rehash32to32(hash1) is equal to the same subset of bit positions + // Rehash32to32(hash2) is minimal. + return (static_cast<uint64_t>(hash) * m + a) >> 32; + } + + static inline uint64_t Rehash32to64(const uint32_t hash) { + static const uint64_t m1 = 0x47b6137a44974d91ull, m2 = 0x8824ad5ba2b7289cull, + a1 = 0x705495c62df1424aull, a2 = 0x9efc49475c6bfb31ull; + const uint64_t hash1 = (static_cast<uint64_t>(hash) * m1 + a1) >> 32; + const uint64_t hash2 = (static_cast<uint64_t>(hash) * m2 + a2) >> 32; + return hash1 | (hash2 << 32); + } +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_HASH_UTIL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/mem-pool-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc new file mode 100644 index 0000000..de0b399 --- /dev/null +++ b/src/parquet/util/mem-pool-test.cc @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Apache Impala on 2016-02-23, and has been modified +// since for parquet-cpp + +#include <gtest/gtest.h> +#include <cstdint> +#include <limits> +#include <string> + +#include "parquet/util/mem-pool.h" +#include "parquet/util/bit-util.h" + +namespace parquet_cpp { + +// Utility class to call private functions on MemPool. +class MemPoolTest { + public: + static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) { + return pool->CheckIntegrity(current_chunk_empty); + } + + static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE; + static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE; +}; + +const int MemPoolTest::INITIAL_CHUNK_SIZE; +const int MemPoolTest::MAX_CHUNK_SIZE; + +TEST(MemPoolTest, Basic) { + MemPool p; + MemPool p2; + MemPool p3; + + for (int iter = 0; iter < 2; ++iter) { + // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes) + for (int i = 0; i < 768; ++i) { + // pads to 32 bytes + p.Allocate(25); + } + // we handed back 24K + EXPECT_EQ(24 * 1024, p.total_allocated_bytes()); + // .. and allocated 28K of chunks (4, 8, 16) + EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes()); + + // we're passing on the first two chunks, containing 12K of data; we're left with + // one chunk of 16K containing 12K of data + p2.AcquireData(&p, true); + EXPECT_EQ(12 * 1024, p.total_allocated_bytes()); + EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes()); + + // we allocate 8K, for which there isn't enough room in the current chunk, + // so another one is allocated (32K) + p.Allocate(8 * 1024); + EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes()); + + // we allocate 65K, which doesn't fit into the current chunk or the default + // size of the next allocated chunk (64K) + p.Allocate(65 * 1024); + EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // Clear() resets allocated data, but doesn't remove any chunks + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // next allocation reuses existing chunks + p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... unless it doesn't fit into any available chunk + p.Allocate(120 * 1024); + EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... Try another chunk that fits into an existing chunk + p.Allocate(33 * 1024); + EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // we're releasing 3 chunks, which get added to p2 + p2.AcquireData(&p, false); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + EXPECT_EQ(0, p.GetTotalChunkSizes()); + + p3.AcquireData(&p2, true); // we're keeping the 65k chunk + EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); + p3.FreeAll(); + } +} + +// Test that we can keep an allocated chunk and a free chunk. +// This case verifies that when chunks are acquired by another memory pool the +// remaining chunks are consistent if there were more than one used chunk and some +// free chunks. +TEST(MemPoolTest, Keep) { + MemPool p; + p.Allocate(4*1024); + p.Allocate(8*1024); + p.Allocate(16*1024); + EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Allocate(1*1024); + p.Allocate(4*1024); + EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + + MemPool p2; + p2.AcquireData(&p, true); + EXPECT_EQ(4 * 1024, p.total_allocated_bytes()); + EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes()); + EXPECT_EQ(1 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); +} + +// Tests that we can return partial allocations. +TEST(MemPoolTest, ReturnPartial) { + MemPool p; + uint8_t* ptr = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + memset(ptr, 0, 1024); + p.ReturnPartialAllocation(1024); + + uint8_t* ptr2 = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr == ptr2); + p.ReturnPartialAllocation(1016); + + ptr2 = p.Allocate(1016); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr2 == ptr + 8); + p.ReturnPartialAllocation(512); + memset(ptr2, 1, 1016 - 512); + + uint8_t* ptr3 = p.Allocate(512); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr3 == ptr + 512); + memset(ptr3, 2, 512); + + for (int i = 0; i < 8; ++i) { + EXPECT_EQ(0, ptr[i]); + } + for (int i = 8; i < 512; ++i) { + EXPECT_EQ(1, ptr[i]); + } + for (int i = 512; i < 1024; ++i) { + EXPECT_EQ(2, ptr[i]); + } + + p.FreeAll(); +} + +// Test that the MemPool overhead is bounded when we make allocations of +// INITIAL_CHUNK_SIZE. +TEST(MemPoolTest, MemoryOverhead) { + MemPool p; + const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE; + const int num_allocs = 1000; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most + // one empty chunk at the end. + EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE); + // The chunk doubling algorithm should not allocate chunks larger than the total + // amount of memory already allocated. + EXPECT_LE(wasted_memory, total_allocated); + } + + p.FreeAll(); +} + +// Test that the MemPool overhead is bounded when we make alternating large and small +// allocations. +TEST(MemPoolTest, FragmentationOverhead) { + MemPool p; + const int num_allocs = 100; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE; + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // Fragmentation should not waste more than half of each completed chunk. + EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE); + } + + p.FreeAll(); +} + +} // namespace parquet_cpp
