http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/PackedRowStoreValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedRowStoreValueAccessor.hpp b/storage/PackedRowStoreValueAccessor.hpp index 80edecd..1520b3a 100644 --- a/storage/PackedRowStoreValueAccessor.hpp +++ b/storage/PackedRowStoreValueAccessor.hpp @@ -20,6 +20,8 @@ #ifndef QUICKSTEP_STORAGE_PACKED_ROW_STORE_VALUE_ACCESSOR_HPP_ #define QUICKSTEP_STORAGE_PACKED_ROW_STORE_VALUE_ACCESSOR_HPP_ +#include <utility> + #include "catalog/CatalogRelationSchema.hpp" #include "catalog/CatalogTypedefs.hpp" #include "storage/StorageBlockInfo.hpp" @@ -42,7 +44,8 @@ class PackedRowStoreValueAccessorHelper { : relation_(relation), num_tuples_(num_tuples), tuple_storage_(tuple_storage), - null_bitmap_(null_bitmap) { + null_bitmap_(null_bitmap), + attr_max_lengths_(relation.getMaximumAttributeByteLengths()) { } inline tuple_id numPackedTuples() const { @@ -67,6 +70,25 @@ class PackedRowStoreValueAccessorHelper { + relation_.getFixedLengthAttributeOffset(attr); // Attribute offset within tuple. } + template <bool check_null> + inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple, + const attribute_id attr) const { + DEBUG_ASSERT(tuple < num_tuples_); + DEBUG_ASSERT(relation_.hasAttributeWithId(attr)); + if (check_null) { + const int nullable_idx = relation_.getNullableAttributeIndex(attr); + if ((nullable_idx != -1) + && null_bitmap_->getBit(tuple * relation_.numNullableAttributes() + nullable_idx)) { + return std::make_pair(nullptr, 0); + } + } + + return std::make_pair(static_cast<const char*>(tuple_storage_) + + (tuple * relation_.getFixedByteLength()) + + relation_.getFixedLengthAttributeOffset(attr), + attr_max_lengths_[attr]); + } + inline TypedValue getAttributeValueTyped(const tuple_id tuple, const attribute_id attr) const { const Type &attr_type = relation_.getAttributeById(attr)->getType(); @@ -81,6 +103,7 @@ class PackedRowStoreValueAccessorHelper { const tuple_id num_tuples_; const void *tuple_storage_; const BitVector<false> *null_bitmap_; + const std::vector<std::size_t> &attr_max_lengths_; DISALLOW_COPY_AND_ASSIGN(PackedRowStoreValueAccessorHelper); };
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/SplitRowStoreValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/SplitRowStoreValueAccessor.hpp b/storage/SplitRowStoreValueAccessor.hpp index 61bb7bf..e2d2b47 100644 --- a/storage/SplitRowStoreValueAccessor.hpp +++ b/storage/SplitRowStoreValueAccessor.hpp @@ -102,6 +102,11 @@ class SplitRowStoreValueAccessor : public ValueAccessor { return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_position_); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const { + return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_position_); + } + inline TypedValue getTypedValue(const attribute_id attr_id) const { return getTypedValueAtAbsolutePosition(attr_id, current_position_); } @@ -142,6 +147,44 @@ class SplitRowStoreValueAccessor : public ValueAccessor { } } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id, + const tuple_id tid) const { + DEBUG_ASSERT(occupancy_bitmap_.getBit(tid)); + DEBUG_ASSERT(relation_.hasAttributeWithId(attr_id)); + const char *tuple_slot = static_cast<const char*>(tuple_storage_) + + tuple_slot_bytes_ * tid; + if (check_null) { + const int nullable_idx = relation_.getNullableAttributeIndex(attr_id); + if (nullable_idx != -1) { + // const_cast is safe here. We will only be using read-only methods of + // BitVector. + BitVector<true> tuple_null_bitmap(const_cast<void*>(static_cast<const void*>(tuple_slot)), + relation_.numNullableAttributes()); + if (tuple_null_bitmap.getBit(nullable_idx)) { + return std::make_pair(nullptr, 0); + } + } + } + + const int variable_length_idx = relation_.getVariableLengthAttributeIndex(attr_id); + if (variable_length_idx == -1) { + // Fixed-length, stored in-line in slot. + return std::make_pair(tuple_slot + per_tuple_null_bitmap_bytes_ + + relation_.getFixedLengthAttributeOffset(attr_id), + attr_max_lengths_[attr_id]); + + } else { + // Variable-length, stored at back of block. + const std::uint32_t *pos_ptr = reinterpret_cast<const std::uint32_t*>( + tuple_slot + per_tuple_null_bitmap_bytes_ + + relation_.getFixedByteLength() + + variable_length_idx * 2 * sizeof(std::uint32_t)); + return std::make_pair(static_cast<const char*>(tuple_storage_) + pos_ptr[0], + pos_ptr[1]); + } + } + inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id, const tuple_id tid) const { DEBUG_ASSERT(occupancy_bitmap_.getBit(tid)); @@ -319,6 +362,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor { tuple_storage_(tuple_storage), tuple_slot_bytes_(tuple_slot_bytes), per_tuple_null_bitmap_bytes_(per_tuple_null_bitmap_bytes), + attr_max_lengths_(relation.getMaximumAttributeByteLengths()), current_position_(std::numeric_limits<std::size_t>::max()) { } @@ -329,6 +373,7 @@ class SplitRowStoreValueAccessor : public ValueAccessor { const void *tuple_storage_; const std::size_t tuple_slot_bytes_; const std::size_t per_tuple_null_bitmap_bytes_; + const std::vector<std::size_t> &attr_max_lengths_; std::size_t current_position_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index 21aa12c..8370418 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -389,15 +389,7 @@ AggregationState* StorageBlock::aggregate( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, std::unique_ptr<TupleIdSequence> *reuse_matches) const { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this same - // block. - if (predicate && !*reuse_matches) { - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all the arguments to this aggregate are plain relation attributes, // aggregate directly on a ValueAccessor from this block to avoid a copy. @@ -418,7 +410,6 @@ void StorageBlock::aggregateGroupBy( const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { @@ -440,14 +431,7 @@ void StorageBlock::aggregateGroupBy( ColumnVectorsValueAccessor temp_result; { std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - + if (reuse_matches) { // Create a filtered ValueAccessor that only iterates over predicate // matches. accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); @@ -499,7 +483,6 @@ void StorageBlock::aggregateDistinct( const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *distinctify_hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { @@ -514,14 +497,7 @@ void StorageBlock::aggregateDistinct( ColumnVectorsValueAccessor temp_result; { std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - + if (reuse_matches) { // Create a filtered ValueAccessor that only iterates over predicate // matches. accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 97b4773..2a20cb5 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -412,7 +412,6 @@ class StorageBlock : public StorageBlockBase { const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, - const Predicate *predicate, std::unique_ptr<TupleIdSequence> *reuse_matches) const; /** @@ -462,7 +461,6 @@ class StorageBlock : public StorageBlockBase { void aggregateGroupBy(const AggregationHandle &handle, const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> @@ -507,7 +505,6 @@ class StorageBlock : public StorageBlockBase { const std::vector<std::unique_ptr<const Scalar>> &arguments, const std::vector<attribute_id> *arguments_as_attributes, const std::vector<std::unique_ptr<const Scalar>> &group_by, - const Predicate *predicate, AggregationStateHashTableBase *distinctify_hash_table, std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; @@ -590,6 +587,8 @@ class StorageBlock : public StorageBlockBase { **/ const std::size_t getNumTuples() const; + TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const; + private: static TupleStorageSubBlock* CreateTupleStorageSubBlock( const CatalogRelationSchema &relation, @@ -629,8 +628,6 @@ class StorageBlock : public StorageBlockBase { // StorageBlock's header. bool rebuildIndexes(bool short_circuit); - TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const; - std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues( const ValueAccessor &accessor, const tuple_id tuple, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/storage/ValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/storage/ValueAccessor.hpp b/storage/ValueAccessor.hpp index 70d4405..b107390 100644 --- a/storage/ValueAccessor.hpp +++ b/storage/ValueAccessor.hpp @@ -377,6 +377,11 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor { return accessor_->template getUntypedValueAtAbsolutePosition<check_null>(attr_id, *current_position_); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const { + return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, *current_position_); + } + inline TypedValue getTypedValue(const attribute_id attr_id) const { return accessor_->getTypedValueAtAbsolutePosition(attr_id, *current_position_); } @@ -389,6 +394,13 @@ class TupleIdSequenceAdapterValueAccessor : public ValueAccessor { } // Pass-through. + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id, + const tuple_id tid) const { + return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, tid); + } + + // Pass-through. inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id, const tuple_id tid) const { return accessor_->getTypedValueAtAbsolutePosition(attr_id, tid); @@ -562,6 +574,12 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor { id_sequence_[current_position_]); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const { + return accessor_->template getUntypedValueAndByteLengthAtAbsolutePosition<check_null>( + attr_id, id_sequence_[current_position_]); + } + inline TypedValue getTypedValue(const attribute_id attr_id) const { return accessor_->getTypedValueAtAbsolutePosition(attr_id, id_sequence_[current_position_]); } @@ -573,6 +591,13 @@ class OrderedTupleIdSequenceAdapterValueAccessor : public ValueAccessor { "OrderedTupleIdSequenceAdapterValueAccessor"); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id, + const tuple_id tid) const { + FATAL_ERROR("getUntypedValueAndByteLengthAtAbsolutePosition() not implemented in " + "OrderedTupleIdSequenceAdapterValueAccessor"); + } + inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id, const tuple_id tid) const { FATAL_ERROR("getTypedValueAtAbsolutePosition() not implemented in " @@ -739,6 +764,11 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor { return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_tuple_); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const { + return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_tuple_); + } + inline TypedValue getTypedValue(const attribute_id attr_id) const { return getTypedValueAtAbsolutePosition(attr_id, current_tuple_); } @@ -749,6 +779,12 @@ class PackedTupleStorageSubBlockValueAccessor : public ValueAccessor { return helper_.template getAttributeValue<check_null>(tid, attr_id); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id, + const tuple_id tid) const { + return helper_.template getAttributeValueAndByteLength<check_null>(tid, attr_id); + } + inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id, const tuple_id tid) const { return helper_.getAttributeValueTyped(tid, attr_id); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/types/containers/ColumnVector.hpp ---------------------------------------------------------------------- diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp index fc65656..0817054 100644 --- a/types/containers/ColumnVector.hpp +++ b/types/containers/ColumnVector.hpp @@ -195,6 +195,22 @@ class NativeColumnVector : public ColumnVector { } /** + * @brief Get the untyped pointer to a value as well as the value's byte length + * in this NativeColumnVector as a pair. + * + * @param position The position of the value to get. + * @return A pair containing the untyped pointer to the value at position and + * the value's byte length. + **/ + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const std::size_t position) const { + DCHECK_LT(position, actual_length_); + return (check_null && null_bitmap_ && null_bitmap_->getBit(position)) + ? std::make_pair(nullptr, 0) + : std::make_pair(static_cast<const char*>(values_) + (position * type_length_), type_length_); + } + + /** * @brief Get a value in this NativeColumnVector as a TypedValue. * * @param position The position of the value to get. @@ -455,6 +471,25 @@ class IndirectColumnVector : public ColumnVector { } /** + * @brief Get the untyped pointer to a value as well as the value's byte length + * in this IndirectColumnVector as a pair. + * + * @param position The position of the value to get. + * @return A pair containing the untyped pointer to the value at position and + * the value's byte length. + **/ + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const std::size_t position) const { + DCHECK_LT(position, values_.size()); + if (check_null && type_is_nullable_ && values_[position].isNull()) { + return std::make_pair(nullptr, 0); + } else { + const TypedValue &value = values_[position]; + return std::make_pair(value.getDataPtr(), value.getDataSize()); + } + } + + /** * @brief Get a value in this IndirectColumnVector as a TypedValue. * * @param position The position of the value to get. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/types/containers/ColumnVectorsValueAccessor.hpp ---------------------------------------------------------------------- diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp index 2300f3b..19e57a9 100644 --- a/types/containers/ColumnVectorsValueAccessor.hpp +++ b/types/containers/ColumnVectorsValueAccessor.hpp @@ -126,6 +126,11 @@ class ColumnVectorsValueAccessor : public ValueAccessor { return getUntypedValueAtAbsolutePosition<check_null>(attr_id, current_position_); } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLength(const attribute_id attr_id) const { + return getUntypedValueAndByteLengthAtAbsolutePosition<check_null>(attr_id, current_position_); + } + inline TypedValue getTypedValue(const attribute_id attr_id) const { return getTypedValueAtAbsolutePosition(attr_id, current_position_); } @@ -142,6 +147,18 @@ class ColumnVectorsValueAccessor : public ValueAccessor { } } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthAtAbsolutePosition(const attribute_id attr_id, + const tuple_id tid) const { + DCHECK(attributeIdInRange(attr_id)); + DCHECK(tupleIdInRange(tid)); + if (column_native_[attr_id]) { + return static_cast<const NativeColumnVector&>(*columns_[attr_id]).getUntypedValueAndByteLength<check_null>(tid); + } else { + return static_cast<const IndirectColumnVector&>(*columns_[attr_id]).getUntypedValueAndByteLength<check_null>(tid); + } + } + inline TypedValue getTypedValueAtAbsolutePosition(const attribute_id attr_id, const tuple_id tid) const { DCHECK(attributeIdInRange(attr_id)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/BloomFilter.hpp b/utility/BloomFilter.hpp index 8d62da9..749d33a 100644 --- a/utility/BloomFilter.hpp +++ b/utility/BloomFilter.hpp @@ -23,6 +23,7 @@ #include <algorithm> #include <cstddef> #include <cstdint> +#include <cstring> #include <memory> #include <utility> #include <vector> @@ -41,11 +42,358 @@ namespace quickstep { * @{ */ +class BloomFilterOriginal; +class BloomFilterBlocked; +typedef BloomFilterBlocked BloomFilter; + +/** + * @brief A "blocked" version of Bloom Filter based on this paper: + * Putze, Felix, Peter Sanders, and Johannes Singler. + * "Cache-, hash-and space-efficient bloom filters." + * International Workshop on Experimental and Efficient Algorithms. + * Springer Berlin Heidelberg, 2007. + **/ +class BloomFilterBlocked { + public: + static const std::uint8_t kNumBitsPerByte = 8; + static const std::uint8_t kMaxNumHashFns = 4; + + // This union allows us to read/write position in convenient fashion, + // through nested structs and their bitfield members + // + // A position can simply be a 32-bit hash + // Or it can be a cache line (block of 512 bits) and position within it + // Or it can be a byte (block of 8 bits) and position within it + union Position { + std::uint32_t hash; + struct CacheLinePosition { + unsigned index_in_line : 9; + unsigned line_num : 23; + } cache_line_pos; + struct BytePosition { + unsigned index_in_byte : 3; + unsigned byte_num : 29; + } byte_pos; + }; + + // This Bloom filter implementation requires the bit array to be a + // multiple of the cache-line size. So we either have to round up to a + // multiple (default behavior) or round down to a multiple. + // Rounding up is usually preferable but rounding down is necessary when + // we are given a bit array that we don't control the size of, in the + // constructor. + static std::uint64_t getNearestAllowedSize( + const std::uint64_t approx_size, + bool round_down = false) { + if (round_down) + return (approx_size / kCacheLineBytes) * kCacheLineBytes; + return ((approx_size + kCacheLineBytes - 1)/ kCacheLineBytes) * kCacheLineBytes; + } + + + /** + * @brief Constructor. + * @note When no bit_array is being passed to the constructor, + * then the bit_array is owned and managed by this class. + * + * @param hash_fn_count The number of hash functions used by this bloom filter. + * @param bit_array_size_in_bytes Size of the bit array. + **/ + BloomFilterBlocked(const std::uint8_t hash_fn_count, + const std::uint64_t bit_array_size_in_bytes) + : hash_fn_count_(hash_fn_count), + array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes)), + is_bit_array_owner_(true), + bit_array_(new std::uint8_t[array_size_in_bytes_]) { + reset(); + } + + /** + * @brief Constructor. + * @note When a bit_array is passed as an argument to the constructor, + * then the ownership of the bit array lies with the caller. + * + * @param hash_fn_count The number of hash functions used by this bloom filter. + * @param bit_array_size_in_bytes Size of the bit array. + * @param bit_array A pointer to the memory region that is used to store bit array. + * @param is_initialized A boolean that indicates whether to zero-out the region + * before use or not. + **/ + BloomFilterBlocked(const std::uint8_t hash_fn_count, + const std::uint64_t bit_array_size_in_bytes, + std::uint8_t *bit_array, + const bool is_initialized) + : hash_fn_count_(hash_fn_count), + array_size_in_bytes_(getNearestAllowedSize(bit_array_size_in_bytes, true)), + is_bit_array_owner_(false), + bit_array_(bit_array) { // Owned by the calling method. + if (!is_initialized) { + reset(); + } + } + + /** + * @brief Constructor. + * @note When a bloom filter proto is passed as an initializer, + * then the bit_array is owned and managed by this class. + * + * @param bloom_filter_proto The protobuf representation of a + * bloom filter configuration. + **/ + explicit BloomFilterBlocked(const serialization::BloomFilter &bloom_filter_proto) + : hash_fn_count_(bloom_filter_proto.number_of_hashes()), + array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()), + is_bit_array_owner_(true), + bit_array_(new std::uint8_t[array_size_in_bytes_]) { + reset(); + } + + /** + * @brief Destructor. + **/ + ~BloomFilterBlocked() { + if (is_bit_array_owner_) { + bit_array_.reset(); + } else { + bit_array_.release(); + } + } + + static bool ProtoIsValid(const serialization::BloomFilter &bloom_filter_proto) { + return bloom_filter_proto.IsInitialized(); + } + + /** + * @brief Zeros out the contents of the bit array. + **/ + inline void reset() { + // Initialize the bit_array with all zeros. + std::fill_n(bit_array_.get(), array_size_in_bytes_, 0x00); + inserted_element_count_ = 0; + } + + /** + * @brief Get the number of hash functions used in this bloom filter. + * + * @return Returns the number of hash functions. + **/ + inline std::uint8_t getNumberOfHashes() const { + return hash_fn_count_; + } + + /** + * @brief Get the size of the bit array in bytes for this bloom filter. + * + * @return Returns the bit array size (in bytes). + **/ + inline std::uint64_t getBitArraySize() const { + return array_size_in_bytes_; + } + + /** + * @brief Get the constant pointer to the bit array for this bloom filter + * + * @return Returns constant pointer to the bit array. + **/ + inline const std::uint8_t* getBitArray() const { + return bit_array_.get(); + } + + template <typename T> + void insert(const T &value) { + insert(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T)); + } + + /** + * @brief Inserts a given value into the bloom filter in a thread-safe manner. + * + * @param key_begin A pointer to the value being inserted. + * @param length Size of the value being inserted in bytes. + */ + inline void insert(const std::uint8_t *key_begin, const std::size_t length) { + SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_); + insertUnSafe(key_begin, length); + } + + template <typename T> + void insertUnSafe(const T &value) { + insertUnSafe(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T)); + } + + /** + * @brief Inserts a given value into the bloom filter. + * @Warning This is a faster thread-unsafe version of the insert() function. + * The caller needs to ensure the thread safety. + * + * @param key_begin A pointer to the value being inserted. + * @param length Size of the value being inserted in bytes. + */ + inline void insertUnSafe(const std::uint8_t *key_begin, const std::size_t length) { + Position first_pos = getFirstPosition(key_begin, length); + setBitAtPosition(first_pos); + Position other_pos; + for (std::uint8_t i = 1; i <hash_fn_count_; ++i) { + other_pos = getOtherPosition(key_begin, length, first_pos, i); + setBitAtPosition(other_pos); + } + ++inserted_element_count_; + } + + template <typename T> + bool contains(const T &value) { + return contains(reinterpret_cast<const std::uint8_t *>(&value), sizeof(T)); + } + + /** + * @brief Test membership of a given value in the bloom filter. + * If true is returned, then a value may or may not be present in the bloom filter. + * If false is returned, a value is certainly not present in the bloom filter. + * + * @note The membersip test does not require any locks, because the assumption is that + * the bloom filter will only be used after it has been built. + * + * @param key_begin A pointer to the value being tested for membership. + * @param length Size of the value being inserted in bytes. + */ + inline bool contains( + const std::uint8_t *__restrict__ key_begin, + const std::size_t length) const { + Position first_pos = getFirstPosition(key_begin, length); + if (!getBitAtPosition(first_pos)) { + return false; + } + Position other_pos; + for (std::uint8_t i = 1; i < hash_fn_count_; ++i) { + other_pos = getOtherPosition(key_begin, length, first_pos, i); + if (!getBitAtPosition(other_pos)) { + return false; + } + } + return true; + } + + /** + * @brief Perform a bitwise-OR of the given Bloom filter with this bloom filter. + * Essentially, it does a union of this bloom filter with the passed bloom filter. + * + * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with. + */ + inline void bitwiseOr(const BloomFilterBlocked *bloom_filter) { + SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_); + for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) { + (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index]; + } + } + + /** + * @brief Return the number of elements currently inserted into bloom filter. + * + * @return The number of elements inserted into bloom filter. + **/ + inline std::uint32_t element_count() const { + return inserted_element_count_; + } + + protected: + Position getFirstPosition(const std::uint8_t *begin, std::size_t length) const { + Position pos; + pos.hash = hash_identity(begin, length); + return pos; + } + + Position getOtherPosition( + const std::uint8_t *begin, + std::size_t length, + const Position first_pos, + const std::uint8_t index) const { + Position pos; + pos.hash = hash_multiplicative(begin, length, hash_fn_[index-1]); + pos.cache_line_pos.line_num = first_pos.cache_line_pos.line_num; + return pos; + } + + void fillPosition( + const std::uint8_t *begin, + std::size_t length, + const std::uint8_t index, + Position positions[]) const { + if (index == 0) + positions[0].hash = hash_identity(begin, length); + else { + positions[index].hash = hash_multiplicative(begin, length, hash_fn_[index-1]); + positions[index].cache_line_pos.line_num = positions[0].cache_line_pos.line_num; + } + } + + void setBitAtPosition(const Position &pos) { + (bit_array_.get())[pos.byte_pos.byte_num] |= (1 << pos.byte_pos.index_in_byte); + } + + bool getBitAtPosition(const Position &pos) const { + return (bit_array_.get())[pos.byte_pos.byte_num] & (1 << pos.byte_pos.index_in_byte); + } + + inline std::uint32_t hash_identity( + const std::uint8_t *__restrict__ begin, + std::size_t length) const { + std::uint32_t hash; + if (length >= 4) + hash = *reinterpret_cast<const std::uint32_t*> (begin); + else + std::memcpy(&hash, begin, length); + return hash % (array_size_in_bytes_ * kNumBitsPerByte); + } + + inline std::uint32_t hash_multiplicative( + const std::uint8_t *__restrict__ begin, + std::size_t length, + const std::uint64_t multiplier) const { + std::uint32_t hash = 0; + std::size_t bytes_hashed = 0; + if (length >= 4) { + while (bytes_hashed < length) { + auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed); + hash += (multiplier * val) >> 24; + bytes_hashed += 4; + } + } + while (bytes_hashed < length) { + std::uint8_t val = *(begin + bytes_hashed); + hash += (multiplier * val) >> 24; + bytes_hashed++; + } + return hash;// % (array_size_in_bytes_ * kNumBitsPerByte); + } + + private: + const std::uint32_t hash_fn_count_; + const std::uint64_t array_size_in_bytes_; + std::uint32_t inserted_element_count_; + const bool is_bit_array_owner_; + + static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761; + const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1 + 0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f, + // 0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff, + 0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff, + // 0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff, + 0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff, + // 0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff, + 0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff, + // 0x1fffffff * kKnuthGoldenRatioNumber // 0x3fffffff, 0x7fffffff, 0xffffffff + }; + + alignas(kCacheLineBytes) std::unique_ptr<std::uint8_t> bit_array_; + alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_; + + DISALLOW_COPY_AND_ASSIGN(BloomFilterBlocked); +}; + /** * @brief A simple Bloom Filter implementation with basic primitives * based on Partow's Bloom Filter implementation. **/ -class BloomFilter { +class BloomFilterOriginal { public: static const uint32_t kNumBitsPerByte = 8; @@ -54,21 +402,17 @@ class BloomFilter { * @note When no bit_array is being passed to the constructor, * then the bit_array is owned and managed by this class. * - * @param random_seed A random_seed that generates unique hash functions. * @param hash_fn_count The number of hash functions used by this bloom filter. * @param bit_array_size_in_bytes Size of the bit array. **/ - BloomFilter(const std::uint64_t random_seed, - const std::size_t hash_fn_count, + BloomFilterOriginal(const std::size_t hash_fn_count, const std::uint64_t bit_array_size_in_bytes) - : random_seed_(random_seed), - hash_fn_count_(hash_fn_count), + : hash_fn_count_(hash_fn_count), array_size_in_bytes_(bit_array_size_in_bytes), array_size_(array_size_in_bytes_ * kNumBitsPerByte), bit_array_(new std::uint8_t[array_size_in_bytes_]), is_bit_array_owner_(true) { reset(); - generate_unique_hash_fn(); } /** @@ -76,20 +420,17 @@ class BloomFilter { * @note When a bit_array is passed as an argument to the constructor, * then the ownership of the bit array lies with the caller. * - * @param random_seed A random_seed that generates unique hash functions. * @param hash_fn_count The number of hash functions used by this bloom filter. * @param bit_array_size_in_bytes Size of the bit array. * @param bit_array A pointer to the memory region that is used to store bit array. * @param is_initialized A boolean that indicates whether to zero-out the region * before use or not. **/ - BloomFilter(const std::uint64_t random_seed, - const std::size_t hash_fn_count, + BloomFilterOriginal(const std::size_t hash_fn_count, const std::uint64_t bit_array_size_in_bytes, std::uint8_t *bit_array, const bool is_initialized) - : random_seed_(random_seed), - hash_fn_count_(hash_fn_count), + : hash_fn_count_(hash_fn_count), array_size_in_bytes_(bit_array_size_in_bytes), array_size_(bit_array_size_in_bytes * kNumBitsPerByte), bit_array_(bit_array), // Owned by the calling method. @@ -97,7 +438,6 @@ class BloomFilter { if (!is_initialized) { reset(); } - generate_unique_hash_fn(); } /** @@ -108,21 +448,19 @@ class BloomFilter { * @param bloom_filter_proto The protobuf representation of a * bloom filter configuration. **/ - explicit BloomFilter(const serialization::BloomFilter &bloom_filter_proto) - : random_seed_(bloom_filter_proto.bloom_filter_seed()), - hash_fn_count_(bloom_filter_proto.number_of_hashes()), + explicit BloomFilterOriginal(const serialization::BloomFilter &bloom_filter_proto) + : hash_fn_count_(bloom_filter_proto.number_of_hashes()), array_size_in_bytes_(bloom_filter_proto.bloom_filter_size()), array_size_(array_size_in_bytes_ * kNumBitsPerByte), bit_array_(new std::uint8_t[array_size_in_bytes_]), is_bit_array_owner_(true) { reset(); - generate_unique_hash_fn(); } /** * @brief Destructor. **/ - ~BloomFilter() { + ~BloomFilterOriginal() { if (is_bit_array_owner_) { bit_array_.reset(); } else { @@ -144,15 +482,6 @@ class BloomFilter { } /** - * @brief Get the random seed that was used to initialize this bloom filter. - * - * @return Returns the random seed. - **/ - inline std::uint64_t getRandomSeed() const { - return random_seed_; - } - - /** * @brief Get the number of hash functions used in this bloom filter. * * @return Returns the number of hash functions. @@ -195,7 +524,7 @@ class BloomFilter { // Determine all the bit positions that are required to be set. for (std::size_t i = 0; i < hash_fn_count_; ++i) { - compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit); + compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit); modified_bit_positions.push_back(std::make_pair(bit_index, bit)); } @@ -240,7 +569,7 @@ class BloomFilter { std::size_t bit = 0; for (std::size_t i = 0; i < hash_fn_count_; ++i) { - compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit); + compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit); (bit_array_.get())[bit_index / kNumBitsPerByte] |= (1 << bit); } @@ -262,7 +591,7 @@ class BloomFilter { std::size_t bit_index = 0; std::size_t bit = 0; for (std::size_t i = 0; i < hash_fn_count_; ++i) { - compute_indices(hash_ap(key_begin, length, hash_fn_[i]), &bit_index, &bit); + compute_indices(hash_multiplicative(key_begin, length, hash_fn_[i]), &bit_index, &bit); if (((bit_array_.get())[bit_index / kNumBitsPerByte] & (1 << bit)) != (1 << bit)) { return false; } @@ -276,7 +605,7 @@ class BloomFilter { * * @param bloom_filter A const pointer to the bloom filter object to do bitwise-OR with. */ - inline void bitwiseOr(const BloomFilter *bloom_filter) { + inline void bitwiseOr(const BloomFilterOriginal *bloom_filter) { SpinSharedMutexExclusiveLock<false> exclusive_writer_lock(bloom_filter_insert_mutex_); for (std::size_t byte_index = 0; byte_index < bloom_filter->getBitArraySize(); ++byte_index) { (bit_array_.get())[byte_index] |= bloom_filter->getBitArray()[byte_index]; @@ -298,95 +627,28 @@ class BloomFilter { *bit = *bit_index % kNumBitsPerByte; } - void generate_unique_hash_fn() { - hash_fn_.reserve(hash_fn_count_); - const std::uint32_t predef_hash_fn_count = 128; - static const std::uint32_t predef_hash_fn[predef_hash_fn_count] = { - 0xAAAAAAAA, 0x55555555, 0x33333333, 0xCCCCCCCC, - 0x66666666, 0x99999999, 0xB5B5B5B5, 0x4B4B4B4B, - 0xAA55AA55, 0x55335533, 0x33CC33CC, 0xCC66CC66, - 0x66996699, 0x99B599B5, 0xB54BB54B, 0x4BAA4BAA, - 0xAA33AA33, 0x55CC55CC, 0x33663366, 0xCC99CC99, - 0x66B566B5, 0x994B994B, 0xB5AAB5AA, 0xAAAAAA33, - 0x555555CC, 0x33333366, 0xCCCCCC99, 0x666666B5, - 0x9999994B, 0xB5B5B5AA, 0xFFFFFFFF, 0xFFFF0000, - 0xB823D5EB, 0xC1191CDF, 0xF623AEB3, 0xDB58499F, - 0xC8D42E70, 0xB173F616, 0xA91A5967, 0xDA427D63, - 0xB1E8A2EA, 0xF6C0D155, 0x4909FEA3, 0xA68CC6A7, - 0xC395E782, 0xA26057EB, 0x0CD5DA28, 0x467C5492, - 0xF15E6982, 0x61C6FAD3, 0x9615E352, 0x6E9E355A, - 0x689B563E, 0x0C9831A8, 0x6753C18B, 0xA622689B, - 0x8CA63C47, 0x42CC2884, 0x8E89919B, 0x6EDBD7D3, - 0x15B6796C, 0x1D6FDFE4, 0x63FF9092, 0xE7401432, - 0xEFFE9412, 0xAEAEDF79, 0x9F245A31, 0x83C136FC, - 0xC3DA4A8C, 0xA5112C8C, 0x5271F491, 0x9A948DAB, - 0xCEE59A8D, 0xB5F525AB, 0x59D13217, 0x24E7C331, - 0x697C2103, 0x84B0A460, 0x86156DA9, 0xAEF2AC68, - 0x23243DA5, 0x3F649643, 0x5FA495A8, 0x67710DF8, - 0x9A6C499E, 0xDCFB0227, 0x46A43433, 0x1832B07A, - 0xC46AFF3C, 0xB9C8FFF0, 0xC9500467, 0x34431BDF, - 0xB652432B, 0xE367F12B, 0x427F4C1B, 0x224C006E, - 0x2E7E5A89, 0x96F99AA5, 0x0BEB452A, 0x2FD87C39, - 0x74B2E1FB, 0x222EFD24, 0xF357F60C, 0x440FCB1E, - 0x8BBE030F, 0x6704DC29, 0x1144D12F, 0x948B1355, - 0x6D8FD7E9, 0x1C11A014, 0xADD1592F, 0xFB3C712E, - 0xFC77642F, 0xF9C4CE8C, 0x31312FB9, 0x08B0DD79, - 0x318FA6E7, 0xC040D23D, 0xC0589AA7, 0x0CA5C075, - 0xF874B172, 0x0CF914D5, 0x784D3280, 0x4E8CFEBC, - 0xC569F575, 0xCDB2A091, 0x2CC016B4, 0x5C5F4421 - }; - if (hash_fn_count_ <= predef_hash_fn_count) { - std::copy(predef_hash_fn, predef_hash_fn + hash_fn_count_, hash_fn_.begin()); - for (std::uint32_t i = 0; i < hash_fn_.size(); ++i) { - hash_fn_[i] = hash_fn_[i] * hash_fn_[(i + 3) % hash_fn_count_] + static_cast<std::uint32_t>(random_seed_); + inline std::uint32_t hash_multiplicative( + const std::uint8_t *begin, + std::size_t remaining_length, + const std::uint64_t multiplier) const { + std::uint32_t hash = 0; + std::size_t bytes_hashed = 0; + if (remaining_length >= 4) { + while (bytes_hashed < remaining_length) { + auto val = *reinterpret_cast<const std::uint32_t *>(begin + bytes_hashed); + hash += (multiplier * val) >> 32; + bytes_hashed += 4; } - } else { - LOG(FATAL) << "Requested number of hash functions is too large."; } - } - - inline std::uint32_t hash_ap(const std::uint8_t *begin, std::size_t remaining_length, std::uint32_t hash) const { - const std::uint8_t *itr = begin; - std::uint32_t loop = 0; - while (remaining_length >= 8) { - const std::uint32_t &i1 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t); - const std::uint32_t &i2 = *(reinterpret_cast<const std::uint32_t*>(itr)); itr += sizeof(std::uint32_t); - hash ^= (hash << 7) ^ i1 * (hash >> 3) ^ (~((hash << 11) + (i2 ^ (hash >> 5)))); - remaining_length -= 8; - } - if (remaining_length) { - if (remaining_length >= 4) { - const std::uint32_t &i = *(reinterpret_cast<const std::uint32_t*>(itr)); - if (loop & 0x01) { - hash ^= (hash << 7) ^ i * (hash >> 3); - } else { - hash ^= (~((hash << 11) + (i ^ (hash >> 5)))); - } - ++loop; - remaining_length -= 4; - itr += sizeof(std::uint32_t); - } - if (remaining_length >= 2) { - const std::uint16_t &i = *(reinterpret_cast<const std::uint16_t*>(itr)); - if (loop & 0x01) { - hash ^= (hash << 7) ^ i * (hash >> 3); - } else { - hash ^= (~((hash << 11) + (i ^ (hash >> 5)))); - } - ++loop; - remaining_length -= 2; - itr += sizeof(std::uint16_t); - } - if (remaining_length) { - hash += ((*itr) ^ (hash * 0xA5A5A5A5)) + loop; - } + while (bytes_hashed < remaining_length) { + std::uint8_t val = *(begin + bytes_hashed); + hash += (multiplier * val) >> 32; + bytes_hashed++; } return hash; } private: - const std::uint64_t random_seed_; - std::vector<std::uint32_t> hash_fn_; const std::uint32_t hash_fn_count_; std::uint64_t array_size_in_bytes_; std::uint64_t array_size_; @@ -394,9 +656,21 @@ class BloomFilter { std::uint32_t inserted_element_count_; const bool is_bit_array_owner_; + static constexpr std::uint64_t kKnuthGoldenRatioNumber = 2654435761; + static constexpr std::size_t kMaxNumHashFns = 8; + const std::uint64_t hash_fn_[kMaxNumHashFns] = { // hash_fn_[i] is 2**(i+1) - 1 + 0x00000001 * kKnuthGoldenRatioNumber, // 0x00000003, 0x00000007, 0x0000000f, + 0x0000001f * kKnuthGoldenRatioNumber, // 0x0000003f, 0x0000007f, 0x000000ff, + 0x000001ff * kKnuthGoldenRatioNumber, // 0x000003ff, 0x000007ff, 0x00000fff, + 0x00001fff * kKnuthGoldenRatioNumber, // 0x00003fff, 0x00007fff, 0x0000ffff, + 0x0001ffff * kKnuthGoldenRatioNumber, // 0x0003ffff, 0x0007ffff, 0x000fffff, + 0x001fffff * kKnuthGoldenRatioNumber, // 0x003fffff, 0x007fffff, 0x00ffffff, + 0x01ffffff * kKnuthGoldenRatioNumber, // 0x03ffffff, 0x07ffffff, 0x0fffffff, + 0x1fffffff * kKnuthGoldenRatioNumber // 0x3fffffff, 0x7fffffff, 0xffffffff + }; alignas(kCacheLineBytes) mutable SpinSharedMutex<false> bloom_filter_insert_mutex_; - DISALLOW_COPY_AND_ASSIGN(BloomFilter); + DISALLOW_COPY_AND_ASSIGN(BloomFilterOriginal); }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilter.proto ---------------------------------------------------------------------- diff --git a/utility/BloomFilter.proto b/utility/BloomFilter.proto index 0f67878..1a8dbf2 100644 --- a/utility/BloomFilter.proto +++ b/utility/BloomFilter.proto @@ -23,10 +23,8 @@ message BloomFilter { // The default values were determined from empirical experiments. // These values control the amount of false positivity that // is expected from Bloom Filter. - // - Default seed for initializing family of hashes = 0xA5A5A5A55A5A5A5A. // - Default bloom filter size = 10 KB. // - Default number of hash functions used in bloom filter = 5. - optional fixed64 bloom_filter_seed = 1 [default = 0xA5A5A5A55A5A5A5A]; - optional uint32 bloom_filter_size = 2 [default = 10000]; - optional uint32 number_of_hashes = 3 [default = 5]; + optional uint32 bloom_filter_size = 1 [default = 10000]; + optional uint32 number_of_hashes = 2 [default = 5]; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/BloomFilterAdapter.hpp ---------------------------------------------------------------------- diff --git a/utility/BloomFilterAdapter.hpp b/utility/BloomFilterAdapter.hpp new file mode 100644 index 0000000..f094307 --- /dev/null +++ b/utility/BloomFilterAdapter.hpp @@ -0,0 +1,142 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP +#define QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <memory> +#include <utility> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "utility/BloomFilter.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +class BloomFilterAdapter { + public: + BloomFilterAdapter(const std::vector<const BloomFilter*> &bloom_filters, + const std::vector<attribute_id> &attribute_ids, + const std::vector<std::size_t> &attr_sizes) { + DCHECK_EQ(bloom_filters.size(), attribute_ids.size()); + DCHECK_EQ(bloom_filters.size(), attr_sizes.size()); + + bloom_filter_entries_.reserve(bloom_filters.size()); + for (std::size_t i = 0; i < bloom_filters.size(); ++i) { + bloom_filter_entries_.emplace_back( + new BloomFilterEntry( + bloom_filters[i], attribute_ids[i], attr_sizes[i])); + } + } + + ~BloomFilterAdapter() { + for (auto &entry : bloom_filter_entries_) { + delete entry; + } + } + + template <bool adapt_filters, typename ValueAccessorT> + inline std::size_t bulkProbe(const ValueAccessorT *accessor, + std::vector<tuple_id> &batch, + const std::size_t batch_size) { + std::size_t out_size = batch_size; + for (auto &entry : bloom_filter_entries_) { + out_size = bulkProbeBloomFilterEntry<adapt_filters>(*entry, accessor, batch, out_size); + } + adaptEntryOrder(); + return out_size; + } + + private: + struct BloomFilterEntry { + BloomFilterEntry(const BloomFilter *in_bloom_filter, + const attribute_id &in_attribute_id, + const std::size_t &in_attribute_size) + : bloom_filter(in_bloom_filter), + attribute_id(in_attribute_id), + attribute_size(in_attribute_size), + miss(0), + cnt(0) { + } + + static bool isBetterThan(const BloomFilterEntry *a, + const BloomFilterEntry *b) { + return a->miss_rate > b->miss_rate; + } + + const BloomFilter *bloom_filter; + const attribute_id attribute_id; + const std::size_t attribute_size; + std::uint32_t miss; + std::uint32_t cnt; + float miss_rate; + }; + + template <bool adapt_filters, typename ValueAccessorT> + inline std::size_t bulkProbeBloomFilterEntry( + BloomFilterEntry &entry, + const ValueAccessorT *accessor, + std::vector<tuple_id> &batch, + const std::size_t in_size) { + std::size_t out_size = 0; + const BloomFilter *bloom_filter = entry.bloom_filter; + + for (std::size_t t = 0; t < in_size; ++t) { + const tuple_id tid = batch[t]; + const auto value = static_cast<const std::uint8_t*>( + accessor->getUntypedValueAtAbsolutePosition(entry.attribute_id, tid)); + if (bloom_filter->contains(value, entry.attribute_size)) { + batch[out_size] = tid; + ++out_size; + } + } + if (adapt_filters) { + entry.cnt += in_size; + entry.miss += (in_size - out_size); + } + return out_size; + } + + inline void adaptEntryOrder() { + for (auto &entry : bloom_filter_entries_) { + entry->miss_rate = static_cast<float>(entry->miss) / entry->cnt; + } + std::sort(bloom_filter_entries_.begin(), + bloom_filter_entries_.end(), + BloomFilterEntry::isBetterThan); + } + + std::vector<BloomFilterEntry *> bloom_filter_entries_; + + DISALLOW_COPY_AND_ASSIGN(BloomFilterAdapter); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_BLOOM_FILTER_ADAPTER_HPP http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index ae1179d..46389f0 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -161,6 +161,7 @@ add_library(quickstep_utility_Alignment ../empty_src.cpp Alignment.hpp) add_library(quickstep_utility_BitManipulation ../empty_src.cpp BitManipulation.hpp) add_library(quickstep_utility_BitVector ../empty_src.cpp BitVector.hpp) add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp) +add_library(quickstep_utility_BloomFilterAdapter ../empty_src.cpp BloomFilterAdapter.hpp) add_library(quickstep_utility_BloomFilter_proto ${quickstep_utility_BloomFilter_proto_srcs} ${quickstep_utility_BloomFilter_proto_hdrs}) @@ -168,6 +169,8 @@ add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory. add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp) add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp) add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp) +add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp) +add_library(quickstep_utility_EventProfiler EventProfiler.cpp EventProfiler.hpp) add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp) add_library(quickstep_utility_ExecutionDAGVisualizer ExecutionDAGVisualizer.cpp @@ -221,6 +224,10 @@ target_link_libraries(quickstep_utility_BloomFilter quickstep_threading_SpinSharedMutex quickstep_utility_BloomFilter_proto quickstep_utility_Macros) +target_link_libraries(quickstep_utility_BloomFilterAdapter + quickstep_catalog_CatalogTypedefs + quickstep_utility_BloomFilter + quickstep_utility_Macros) target_link_libraries(quickstep_utility_BloomFilter_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_utility_CalculateInstalledMemory @@ -230,6 +237,9 @@ target_link_libraries(quickstep_utility_CheckSnprintf target_link_libraries(quickstep_utility_DAG glog quickstep_utility_Macros) +target_link_libraries(quickstep_utility_DisjointTreeForest) +target_link_libraries(quickstep_utility_EventProfiler + quickstep_threading_Mutex) target_link_libraries(quickstep_utility_ExecutionDAGVisualizer quickstep_catalog_CatalogRelationSchema quickstep_queryexecution_QueryExecutionTypedefs @@ -312,11 +322,14 @@ target_link_libraries(quickstep_utility quickstep_utility_BitManipulation quickstep_utility_BitVector quickstep_utility_BloomFilter + quickstep_utility_BloomFilterAdapter quickstep_utility_BloomFilter_proto quickstep_utility_CalculateInstalledMemory quickstep_utility_Cast quickstep_utility_CheckSnprintf quickstep_utility_DAG + quickstep_utility_DisjointTreeForest + quickstep_utility_EventProfiler quickstep_utility_EqualsAnyConstant quickstep_utility_ExecutionDAGVisualizer quickstep_utility_Glob http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/DisjointTreeForest.hpp ---------------------------------------------------------------------- diff --git a/utility/DisjointTreeForest.hpp b/utility/DisjointTreeForest.hpp new file mode 100644 index 0000000..f5722ba --- /dev/null +++ b/utility/DisjointTreeForest.hpp @@ -0,0 +1,116 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_ +#define QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_ + +#include <cstddef> +#include <limits> +#include <utility> +#include <unordered_map> + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +/** + * @brief A.k.a. union-find set. + */ +template <typename ElementT, + class MapperT = std::unordered_map<ElementT, std::size_t>> +class DisjointTreeForest { + public: + inline bool hasElement(const ElementT &element) const { + return elements_map_.find(element) != elements_map_.end(); + } + + inline void makeSet(const ElementT &element) { + if (!hasElement(element)) { + std::size_t loc = nodes_.size(); + nodes_.emplace_back(0, loc); + elements_map_.emplace(element, loc); + } + } + + inline std::size_t find(const ElementT &element) { + const std::size_t node_id = elements_map_.at(element); + std::size_t root_id = node_id; + std::size_t parent_id; + while ((parent_id = nodes_[root_id].parent) != root_id) { + root_id = parent_id; + } + compress_path(node_id, root_id); + return root_id; + } + + inline void merge(const ElementT &element1, const ElementT &element2) { + std::size_t root_id1 = find(element1); + std::size_t root_id2 = find(element2); + if (root_id1 != root_id2) { + Node &n1 = nodes_[root_id1]; + Node &n2 = nodes_[root_id2]; + if (n1.rank > n2.rank) { + n2.parent = root_id1; + } else if (n1.rank < n2.rank) { + n1.parent = root_id2; + } else { + n1.parent = root_id2; + n2.rank += 1; + } + } + } + + inline bool isConnected(const ElementT &element1, const ElementT &element2) { + return find(element1) == find(element2); + } + + private: + struct Node { + Node(const std::size_t rank_in, const std::size_t parent_in) + : rank(rank_in), parent(parent_in) { + } + std::size_t rank; + std::size_t parent; + }; + + inline void compress_path(const std::size_t leaf_node_id, const std::size_t root_node_id) { + std::size_t node_id = leaf_node_id; + std::size_t max_rank = 0; + while (node_id != root_node_id) { + const Node &node = nodes_[node_id]; + max_rank = std::max(max_rank, node.rank); + + const std::size_t parent_id = node.parent; + nodes_[node_id].parent = root_node_id; + node_id = parent_id; + } + nodes_[root_node_id].rank = max_rank + 1; + } + + std::vector<Node> nodes_; + MapperT elements_map_; + + static constexpr std::size_t kInvalid = std::numeric_limits<std::size_t>::max(); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_DISJOINT_TREE_FOREST_HPP_ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/EventProfiler.cpp ---------------------------------------------------------------------- diff --git a/utility/EventProfiler.cpp b/utility/EventProfiler.cpp new file mode 100644 index 0000000..728ebff --- /dev/null +++ b/utility/EventProfiler.cpp @@ -0,0 +1,29 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "utility/EventProfiler.hpp" + +#include <cstddef> +#include <string> +#include <vector> + +namespace quickstep { + +EventProfiler<int, std::size_t> simple_profiler; +EventProfiler<std::size_t> relop_profiler; + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/EventProfiler.hpp ---------------------------------------------------------------------- diff --git a/utility/EventProfiler.hpp b/utility/EventProfiler.hpp new file mode 100644 index 0000000..70024e6 --- /dev/null +++ b/utility/EventProfiler.hpp @@ -0,0 +1,188 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_ +#define QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_ + +#include <chrono> +#include <cstddef> +#include <cstring> +#include <ctime> +#include <iomanip> +#include <map> +#include <ostream> +#include <thread> +#include <type_traits> +#include <utility> +#include <vector> + +#include "threading/Mutex.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +using clock = std::chrono::steady_clock; + +template <typename TagT, typename ...PayloadT> +class EventProfiler { + + public: + EventProfiler() + : zero_time_(clock::now()) { + } + + struct EventInfo { + clock::time_point start_time; + clock::time_point end_time; + bool is_finished; + std::tuple<PayloadT...> payload; + + explicit EventInfo(const clock::time_point &start_time_in) + : start_time(start_time_in), + is_finished(false) { + } + + EventInfo() + : start_time(clock::now()), + is_finished(false) { + } + + inline void setPayload(PayloadT &&...in_payload) { + payload = std::make_tuple(in_payload...); + } + + inline void endEvent() { + end_time = clock::now(); + is_finished = true; + } + }; + + struct EventContainer { + EventContainer() + : context(0) {} + + inline void startEvent(const TagT &tag) { + events[tag].emplace_back(clock::now()); + } + + inline void endEvent(const TagT &tag) { + auto &event_info = events.at(tag).back(); + event_info.is_finished = true; + event_info.end_time = clock::now(); + } + + inline std::vector<EventInfo> *getEventLine(const TagT &tag) { + return &events[tag]; + } + + inline void setContext(int context_in) { + context = context_in; + } + + inline int getContext() const { + return context; + } + + std::map<TagT, std::vector<EventInfo>> events; + int context; + }; + + EventContainer *getContainer() { + MutexLock lock(mutex_); + return &thread_map_[std::this_thread::get_id()]; + } + + void writeToStream(std::ostream &os) const { + time_t rawtime; + time(&rawtime); + char event_id[32]; + strftime(event_id, sizeof event_id, "%Y-%m-%d %H:%M:%S", localtime(&rawtime)); + + int thread_id = 0; + for (const auto &thread_ctx : thread_map_) { + for (const auto &event_group : thread_ctx.second.events) { + for (const auto &event_info : event_group.second) { + CHECK(event_info.is_finished) << "Unfinished profiling event"; + + os << std::setprecision(12) + << event_id << "," + << thread_id << "," << event_group.first << ","; + + PrintTuple(os, event_info.payload, ","); + + os << std::chrono::duration<double>(event_info.start_time - zero_time_).count() + << "," + << std::chrono::duration<double>(event_info.end_time - zero_time_).count() + << "\n"; + } + } + ++thread_id; + } + } + + void clear() { + zero_time_ = clock::now(); + thread_map_.clear(); + } + + const std::map<std::thread::id, EventContainer> &containers() { + return thread_map_; + } + + const clock::time_point &zero_time() { + return zero_time_; + } + + private: + template<class Tuple, std::size_t N> + struct TuplePrinter { + static void Print(std::ostream &os, const Tuple &t, const std::string &sep) { + TuplePrinter<Tuple, N-1>::Print(os, t, sep); + os << std::get<N-1>(t) << sep; + } + }; + + template<class Tuple> + struct TuplePrinter<Tuple, 1> { + static void Print(std::ostream &os, const Tuple &t, const std::string &sep) { + os << std::get<0>(t) << sep; + } + }; + + template<class... Args> + static void PrintTuple(std::ostream &os, const std::tuple<Args...>& t, const std::string &sep) { + TuplePrinter<decltype(t), sizeof...(Args)>::Print(os, t, sep); + } + + clock::time_point zero_time_; + std::map<std::thread::id, EventContainer> thread_map_; + Mutex mutex_; +}; + +extern EventProfiler<int, std::size_t> simple_profiler; +extern EventProfiler<std::size_t> relop_profiler; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b7150fb1/utility/PlanVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp index 50cf7f0..b90a8dc 100644 --- a/utility/PlanVisualizer.cpp +++ b/utility/PlanVisualizer.cpp @@ -21,6 +21,7 @@ #include <cstddef> #include <memory> +#include <set> #include <sstream> #include <string> #include <unordered_map> @@ -30,6 +31,7 @@ #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/HashJoin.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" @@ -103,6 +105,10 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { int node_id = ++id_counter_; node_id_map_.emplace(input, node_id); + std::set<E::ExprId> referenced_ids; + for (const auto &attr : input->getReferencedAttributes()) { + referenced_ids.emplace(attr->id()); + } for (const auto &child : input->children()) { visit(child); @@ -113,10 +119,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { edge_info.src_node_id = child_id; edge_info.dst_node_id = node_id; - // Print output attributes except for TableReference -- there are just too many - // attributes out of TableReference. - if (child->getPhysicalType() != P::PhysicalType::kTableReference) { - for (const auto &attr : child->getOutputAttributes()) { + for (const auto &attr : child->getOutputAttributes()) { + if (referenced_ids.find(attr->id()) != referenced_ids.end()) { edge_info.labels.emplace_back(attr->attribute_alias()); } } @@ -147,6 +151,36 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { node_info.labels.emplace_back( left_attributes[i]->attribute_alias() + " = " + right_attributes[i]->attribute_alias()); } + if (hash_join->left()->impliesUniqueAttributes(left_attributes)) { + node_info.labels.emplace_back("LEFT join attrs unique"); + } + if (hash_join->right()->impliesUniqueAttributes(right_attributes)) { + node_info.labels.emplace_back("RIGHT join attrs unique"); + } + + const auto &bf_config = hash_join->bloom_filter_config(); + for (const auto &bf : bf_config.build_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF build] ") + bf.attribute->attribute_alias()); + } + for (const auto &bf : bf_config.probe_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF probe] ") + bf.attribute->attribute_alias()); + } + + break; + } + case P::PhysicalType::kAggregate: { + const P::AggregatePtr aggregate = + std::static_pointer_cast<const P::Aggregate>(input); + node_info.labels.emplace_back(input->getName()); + + const auto &bf_config = aggregate->bloom_filter_config(); + for (const auto &bf : bf_config.probe_side_bloom_filters) { + node_info.labels.emplace_back( + std::string("[BF probe] ") + bf.attribute->attribute_alias()); + } + break; } default: {