http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp new file mode 100644 index 0000000..25f7786 --- /dev/null +++ b/storage/CollisionFreeVectorTable.hpp @@ -0,0 +1,621 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ +#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <memory> +#include <type_traits> +#include <utility> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "types/Type.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/ColumnVector.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class ColumnVectorsValueAccessor; +class StorageMnager; + +/** \addtogroup Storage + * @{ + */ + +class CollisionFreeVectorTable : public AggregationStateHashTableBase { + public: + CollisionFreeVectorTable( + const std::vector<const Type *> &key_types, + const std::size_t num_entries, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager); + + ~CollisionFreeVectorTable() override; + + void destroyPayload() override; + + inline std::size_t getNumInitializationPartitions() const { + return num_init_partitions_; + } + + inline std::size_t getNumFinalizationPartitions() const { + return num_finalize_partitions_; + } + + inline std::size_t getNumTuplesInPartition( + const std::size_t partition_id) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + return existence_map_->onesCountInRange(start_position, end_position); + } + + inline void initialize(const std::size_t partition_id) { + const std::size_t memory_segment_size = + (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_; + const std::size_t memory_start = memory_segment_size * partition_id; + std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start, + 0, + std::min(memory_segment_size, memory_size_ - memory_start)); + } + + bool upsertValueAccessor( + const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_ids, + ValueAccessorMultiplexer *accessor_mux) override; + + void finalizeKey(const std::size_t partition_id, + NativeColumnVector *output_cv) const; + + void finalizeState(const std::size_t partition_id, + std::size_t handle_id, + NativeColumnVector *output_cv) const; + + private: + inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) { + return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes; + } + + inline std::size_t calculatePartitionLength() const { + const std::size_t partition_length = + (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_; + DCHECK_GE(partition_length, 0u); + return partition_length; + } + + inline std::size_t calculatePartitionStartPosition( + const std::size_t partition_id) const { + return calculatePartitionLength() * partition_id; + } + + inline std::size_t calculatePartitionEndPosition( + const std::size_t partition_id) const { + return std::min(calculatePartitionLength() * (partition_id + 1), + num_entries_); + } + + template <bool use_two_accessors, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const bool is_key_nullable, + const bool is_argument_nullable, + ArgTypes &&...args); + + template <bool ...bool_values, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const Type *key_type, + ArgTypes &&...args); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const Type *argument_type, + const AggregationID agg_id, + ArgTypes &&...args); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorCountHelper( + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorSumHelper( + const Type *argument_type, + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <typename ...ArgTypes> + inline void upsertValueAccessorKeyOnlyHelper( + const bool is_key_nullable, + const Type *key_type, + ArgTypes &&...args); + + template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> + inline void upsertValueAccessorKeyOnly( + const attribute_id key_attr_id, + KeyValueAccessorT *key_accessor); + + template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> + inline void upsertValueAccessorCountNullary( + const attribute_id key_attr_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor); + + template <bool use_two_accessors, bool is_key_nullable, typename KeyT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorCountUnary( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorIntegerSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorGenericSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <typename KeyT> + inline void finalizeKeyInternal(const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + } + } + + template <typename ...ArgTypes> + inline void finalizeStateDispatchHelper(const AggregationID agg_id, + const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + switch (agg_id) { + case AggregationID::kCount: + finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case AggregationID::kSum: + finalizeStateSumHelper(argument_type, + vec_table, + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } + } + + template <typename ...ArgTypes> + inline void finalizeStateSumHelper(const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + DCHECK(argument_type != nullptr); + + switch (argument_type->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + finalizeStateSum<std::int64_t>( + static_cast<const std::atomic<std::int64_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + finalizeStateSum<double>( + static_cast<const std::atomic<double> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } + } + + inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } + } + + template <typename ResultT, typename StateT> + inline void finalizeStateSum(const std::atomic<StateT> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } + } + + const Type *key_type_; + const std::size_t num_entries_; + + const std::size_t num_handles_; + const std::vector<AggregationHandle *> handles_; + + std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_; + std::vector<void *> vec_tables_; + + const std::size_t num_finalize_partitions_; + + StorageManager *storage_manager_; + MutableBlobReference blob_; + + std::size_t memory_size_; + std::size_t num_init_partitions_; + + DISALLOW_COPY_AND_ASSIGN(CollisionFreeVectorTable); +}; + +// ---------------------------------------------------------------------------- +// Implementations of template methods follow. + +template <bool use_two_accessors, typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::upsertValueAccessorDispatchHelper(const bool is_key_nullable, + const bool is_argument_nullable, + ArgTypes &&...args) { + if (is_key_nullable) { + if (is_argument_nullable) { + upsertValueAccessorDispatchHelper<use_two_accessors, true, true>( + std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorDispatchHelper<use_two_accessors, true, false>( + std::forward<ArgTypes>(args)...); + } + } else { + if (is_argument_nullable) { + upsertValueAccessorDispatchHelper<use_two_accessors, false, true>( + std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorDispatchHelper<use_two_accessors, false, false>( + std::forward<ArgTypes>(args)...); + } + } +} + +template <bool ...bool_values, typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::upsertValueAccessorDispatchHelper(const Type *key_type, + ArgTypes &&...args) { + switch (key_type->getTypeID()) { + case TypeID::kInt: + upsertValueAccessorDispatchHelper<bool_values..., int>( + std::forward<ArgTypes>(args)...); + return; + case TypeID::kLong: + upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>( + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::upsertValueAccessorDispatchHelper(const Type *argument_type, + const AggregationID agg_id, + ArgTypes &&...args) { + switch (agg_id) { + case AggregationID::kCount: + upsertValueAccessorCountHelper< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( + std::forward<ArgTypes>(args)...); + return; + case AggregationID::kSum: + upsertValueAccessorSumHelper< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( + argument_type, std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable, + const Type *key_type, + ArgTypes &&...args) { + switch (key_type->getTypeID()) { + case TypeID::kInt: { + if (is_key_nullable) { + upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...); + } + return; + } + case TypeID::kLong: { + if (is_key_nullable) { + upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...); + } + return; + } + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id, + ValueAccessorT *accessor) { + accessor->beginIteration(); + while (accessor->next()) { + const KeyT *key = static_cast<const KeyT *>( + accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + existence_map_->setBit(*key); + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorCountHelper(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + DCHECK_GE(key_attr_id, 0u); + + if (is_argument_nullable && argument_id != kInvalidAttributeID) { + upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::size_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + } else { + upsertValueAccessorCountNullary<is_key_nullable, KeyT>( + key_attr_id, + static_cast<std::atomic<std::size_t> *>(vec_table), + key_accessor); + return; + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorSumHelper(const Type *argument_type, + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + DCHECK_GE(key_attr_id, 0u); + DCHECK_GE(argument_id, 0u); + DCHECK(argument_type != nullptr); + + switch (argument_type->getTypeID()) { + case TypeID::kInt: + upsertValueAccessorIntegerSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::int64_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kLong: + upsertValueAccessorIntegerSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::int64_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kFloat: + upsertValueAccessorGenericSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>( + key_attr_id, + argument_id, + static_cast<std::atomic<double> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kDouble: + upsertValueAccessorGenericSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>( + key_attr_id, + argument_id, + static_cast<std::atomic<double> *>(vec_table), + key_accessor, + argument_accessor); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorCountNullary(const attribute_id key_attr_id, + std::atomic<std::size_t> *vec_table, + ValueAccessorT *accessor) { + accessor->beginIteration(); + while (accessor->next()) { + const KeyT *key = static_cast<const KeyT *>( + accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + vec_table[loc].fetch_add(1u, std::memory_order_relaxed); + existence_map_->setBit(loc); + } +} + +template <bool use_two_accessors, bool is_key_nullable, typename KeyT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorCountUnary(const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + if (use_two_accessors) { + argument_accessor->beginIteration(); + } + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + if (argument_accessor->getUntypedValue(argument_id) == nullptr) { + continue; + } + vec_table[loc].fetch_add(1u, std::memory_order_relaxed); + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + if (use_two_accessors) { + argument_accessor->beginIteration(); + } + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + const ArgumentT *argument = static_cast<const ArgumentT *>( + argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); + if (is_argument_nullable && argument == nullptr) { + continue; + } + vec_table[loc].fetch_add(*argument, std::memory_order_relaxed); + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeVectorTable + ::upsertValueAccessorGenericSum(const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + if (use_two_accessors) { + argument_accessor->beginIteration(); + } + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + const ArgumentT *argument = static_cast<const ArgumentT *>( + argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); + if (is_argument_nullable && argument == nullptr) { + continue; + } + const ArgumentT arg_val = *argument; + std::atomic<StateT> &state = vec_table[loc]; + StateT state_val = state.load(std::memory_order_relaxed); + while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {} + } +} + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index d95362c..b88bf87 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -24,12 +24,12 @@ #include <string> #include <vector> -#include "storage/CollisionFreeAggregationStateHashTable.hpp" +#include "storage/CollisionFreeVectorTable.hpp" #include "storage/HashTable.hpp" #include "storage/HashTableBase.hpp" #include "storage/HashTable.pb.h" #include "storage/LinearOpenAddressingHashTable.hpp" -#include "storage/PackedPayloadAggregationStateHashTable.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/SeparateChainingHashTable.hpp" #include "storage/SimpleScalarSeparateChainingHashTable.hpp" #include "storage/TupleReference.hpp" @@ -346,10 +346,10 @@ class AggregationStateHashTableFactory { StorageManager *storage_manager) { switch (hash_table_type) { case HashTableImplType::kSeparateChaining: - return new PackedPayloadSeparateChainingAggregationStateHashTable( + return new PackedPayloadHashTable( key_types, num_entries, handles, storage_manager); case HashTableImplType::kCollisionFreeVector: - return new CollisionFreeAggregationStateHashTable( + return new CollisionFreeVectorTable( key_types, num_entries, handles, storage_manager); default: { LOG(FATAL) << "Unrecognized HashTableImplType in " http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadAggregationStateHashTable.cpp b/storage/PackedPayloadAggregationStateHashTable.cpp deleted file mode 100644 index 0292092..0000000 --- a/storage/PackedPayloadAggregationStateHashTable.cpp +++ /dev/null @@ -1,439 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "storage/PackedPayloadAggregationStateHashTable.hpp" - -namespace quickstep { - -PackedPayloadSeparateChainingAggregationStateHashTable - ::PackedPayloadSeparateChainingAggregationStateHashTable( - const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager) - : key_types_(key_types), - num_handles_(handles.size()), - handles_(handles), - total_payload_size_(ComputeTotalPayloadSize(handles)), - storage_manager_(storage_manager), - kBucketAlignment(alignof(std::atomic<std::size_t>)), - kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)), - key_manager_(key_types_, kValueOffset + total_payload_size_), - bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) { - std::size_t payload_offset_running_sum = sizeof(SpinMutex); - for (const auto *handle : handles) { - payload_offsets_.emplace_back(payload_offset_running_sum); - payload_offset_running_sum += handle->getPayloadSize(); - } - - // NOTE(jianqiao): Potential memory leak / double freeing by copying from - // init_payload to buckets if payload contains out of line data. - init_payload_ = - static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1)); - DCHECK(init_payload_ != nullptr); - - for (std::size_t i = 0; i < num_handles_; ++i) { - handles_[i]->initPayload(init_payload_ + payload_offsets_[i]); - } - - // Bucket size always rounds up to the alignment requirement of the atomic - // size_t "next" pointer at the front or a ValueT, whichever is larger. - // - // Give base HashTable information about what key components are stored - // inline from 'key_manager_'. - setKeyInline(key_manager_.getKeyInline()); - - // Pick out a prime number of slots and calculate storage requirements. - std::size_t num_slots_tmp = - get_next_prime_number(num_entries * kHashTableLoadFactor); - std::size_t required_memory = - sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) + - (num_slots_tmp / kHashTableLoadFactor) * - (bucket_size_ + key_manager_.getEstimatedVariableKeySize()); - std::size_t num_storage_slots = - this->storage_manager_->SlotsNeededForBytes(required_memory); - if (num_storage_slots == 0) { - FATAL_ERROR( - "Storage requirement for SeparateChainingHashTable " - "exceeds maximum allocation size."); - } - - // Get a StorageBlob to hold the hash table. - const block_id blob_id = - this->storage_manager_->createBlob(num_storage_slots); - this->blob_ = this->storage_manager_->getBlobMutable(blob_id); - - void *aligned_memory_start = this->blob_->getMemoryMutable(); - std::size_t available_memory = num_storage_slots * kSlotSizeBytes; - if (align(alignof(Header), - sizeof(Header), - aligned_memory_start, - available_memory) == nullptr) { - // With current values from StorageConstants.hpp, this should be - // impossible. A blob is at least 1 MB, while a Header has alignment - // requirement of just kCacheLineBytes (64 bytes). - FATAL_ERROR( - "StorageBlob used to hold resizable " - "SeparateChainingHashTable is too small to meet alignment " - "requirements of SeparateChainingHashTable::Header."); - } else if (aligned_memory_start != this->blob_->getMemoryMutable()) { - // This should also be impossible, since the StorageManager allocates slots - // aligned to kCacheLineBytes. - DEV_WARNING("StorageBlob memory adjusted by " - << (num_storage_slots * kSlotSizeBytes - available_memory) - << " bytes to meet alignment requirement for " - << "SeparateChainingHashTable::Header."); - } - - // Locate the header. - header_ = static_cast<Header *>(aligned_memory_start); - aligned_memory_start = - static_cast<char *>(aligned_memory_start) + sizeof(Header); - available_memory -= sizeof(Header); - - // Recompute the number of slots & buckets using the actual available memory. - // Most likely, we got some extra free bucket space due to "rounding up" to - // the storage blob's size. It's also possible (though very unlikely) that we - // will wind up with fewer buckets than we initially wanted because of screwy - // alignment requirements for ValueT. - std::size_t num_buckets_tmp = - available_memory / - (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ + - key_manager_.getEstimatedVariableKeySize()); - num_slots_tmp = - get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor); - num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor; - DEBUG_ASSERT(num_slots_tmp > 0); - DEBUG_ASSERT(num_buckets_tmp > 0); - - // Locate the slot array. - slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start); - aligned_memory_start = static_cast<char *>(aligned_memory_start) + - sizeof(std::atomic<std::size_t>) * num_slots_tmp; - available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp; - - // Locate the buckets. - buckets_ = aligned_memory_start; - // Extra-paranoid: If ValueT has an alignment requirement greater than that - // of std::atomic<std::size_t>, we may need to adjust the start of the bucket - // array. - if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) == - nullptr) { - FATAL_ERROR( - "StorageBlob used to hold resizable " - "SeparateChainingHashTable is too small to meet " - "alignment requirements of buckets."); - } else if (buckets_ != aligned_memory_start) { - DEV_WARNING( - "Bucket array start position adjusted to meet alignment " - "requirement for SeparateChainingHashTable's value type."); - if (num_buckets_tmp * bucket_size_ > available_memory) { - --num_buckets_tmp; - } - } - - // Fill in the header. - header_->num_slots = num_slots_tmp; - header_->num_buckets = num_buckets_tmp; - header_->buckets_allocated.store(0, std::memory_order_relaxed); - header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed); - available_memory -= bucket_size_ * (header_->num_buckets); - - // Locate variable-length key storage region, and give it all the remaining - // bytes in the blob. - key_manager_.setVariableLengthStorageInfo( - static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_, - available_memory, - &(header_->variable_length_bytes_allocated)); -} - -PackedPayloadSeparateChainingAggregationStateHashTable - ::~PackedPayloadSeparateChainingAggregationStateHashTable() { - if (blob_.valid()) { - const block_id blob_id = blob_->getID(); - blob_.release(); - storage_manager_->deleteBlockOrBlobFile(blob_id); - } - std::free(init_payload_); -} - -void PackedPayloadSeparateChainingAggregationStateHashTable::clear() { - const std::size_t used_buckets = - header_->buckets_allocated.load(std::memory_order_relaxed); - // Destroy existing values, if necessary. - destroyPayload(); - - // Zero-out slot array. - std::memset( - slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots); - - // Zero-out used buckets. - std::memset(buckets_, 0x0, used_buckets * bucket_size_); - - header_->buckets_allocated.store(0, std::memory_order_relaxed); - header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed); - key_manager_.zeroNextVariableLengthKeyOffset(); -} - -void PackedPayloadSeparateChainingAggregationStateHashTable::destroyPayload() { - const std::size_t num_buckets = - header_->buckets_allocated.load(std::memory_order_relaxed); - void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset; - for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) { - for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) { - void *value_internal_ptr = - static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id]; - handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr)); - } - bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_; - } -} - -bool PackedPayloadSeparateChainingAggregationStateHashTable::upsertValueAccessor( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_attr_ids, - ValueAccessorMultiplexer *accessor_mux) { - DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType() - == ValueAccessor::Implementation::kColumnVectors); - ValueAccessor *base_accessor = accessor_mux->getBaseAccessor(); - ColumnVectorsValueAccessor *derived_accessor = - static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor()); - - if (derived_accessor == nullptr) { - return upsertValueAccessorCompositeKeyInternal<false>(argument_ids, - key_attr_ids, - base_accessor, - derived_accessor); - } else { - return upsertValueAccessorCompositeKeyInternal<true>(argument_ids, - key_attr_ids, - base_accessor, - derived_accessor); - } -} - -void PackedPayloadSeparateChainingAggregationStateHashTable - ::resize(const std::size_t extra_buckets, - const std::size_t extra_variable_storage, - const std::size_t retry_num) { - // A retry should never be necessary with this implementation of HashTable. - // Separate chaining ensures that any resized hash table with more buckets - // than the original table will be able to hold more entries than the - // original. - DEBUG_ASSERT(retry_num == 0); - - SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_); - - // Recheck whether the hash table is still full. Note that multiple threads - // might wait to rebuild this hash table simultaneously. Only the first one - // should do the rebuild. - if (!isFull(extra_variable_storage)) { - return; - } - - // Approximately double the number of buckets and slots. - // - // TODO(chasseur): It may be worth it to more than double the number of - // buckets here so that we can maintain a good, sparse fill factor for a - // longer time as more values are inserted. Such behavior should take into - // account kHashTableLoadFactor. - std::size_t resized_num_slots = get_next_prime_number( - (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2); - std::size_t variable_storage_required = - (resized_num_slots / kHashTableLoadFactor) * - key_manager_.getEstimatedVariableKeySize(); - const std::size_t original_variable_storage_used = - header_->variable_length_bytes_allocated.load(std::memory_order_relaxed); - // If this resize was triggered by a too-large variable-length key, bump up - // the variable-length storage requirement. - if ((extra_variable_storage > 0) && - (extra_variable_storage + original_variable_storage_used > - key_manager_.getVariableLengthKeyStorageSize())) { - variable_storage_required += extra_variable_storage; - } - - const std::size_t resized_memory_required = - sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) + - (resized_num_slots / kHashTableLoadFactor) * bucket_size_ + - variable_storage_required; - const std::size_t resized_storage_slots = - this->storage_manager_->SlotsNeededForBytes(resized_memory_required); - if (resized_storage_slots == 0) { - FATAL_ERROR( - "Storage requirement for resized SeparateChainingHashTable " - "exceeds maximum allocation size."); - } - - // Get a new StorageBlob to hold the resized hash table. - const block_id resized_blob_id = - this->storage_manager_->createBlob(resized_storage_slots); - MutableBlobReference resized_blob = - this->storage_manager_->getBlobMutable(resized_blob_id); - - // Locate data structures inside the new StorageBlob. - void *aligned_memory_start = resized_blob->getMemoryMutable(); - std::size_t available_memory = resized_storage_slots * kSlotSizeBytes; - if (align(alignof(Header), - sizeof(Header), - aligned_memory_start, - available_memory) == nullptr) { - // Should be impossible, as noted in constructor. - FATAL_ERROR( - "StorageBlob used to hold resized SeparateChainingHashTable " - "is too small to meet alignment requirements of " - "LinearOpenAddressingHashTable::Header."); - } else if (aligned_memory_start != resized_blob->getMemoryMutable()) { - // Again, should be impossible. - DEV_WARNING("In SeparateChainingHashTable::resize(), StorageBlob " - << "memory adjusted by " - << (resized_num_slots * kSlotSizeBytes - available_memory) - << " bytes to meet alignment requirement for " - << "LinearOpenAddressingHashTable::Header."); - } - - Header *resized_header = static_cast<Header *>(aligned_memory_start); - aligned_memory_start = - static_cast<char *>(aligned_memory_start) + sizeof(Header); - available_memory -= sizeof(Header); - - // As in constructor, recompute the number of slots and buckets using the - // actual available memory. - std::size_t resized_num_buckets = - (available_memory - extra_variable_storage) / - (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ + - key_manager_.getEstimatedVariableKeySize()); - resized_num_slots = - get_previous_prime_number(resized_num_buckets * kHashTableLoadFactor); - resized_num_buckets = resized_num_slots / kHashTableLoadFactor; - - // Locate slot array. - std::atomic<std::size_t> *resized_slots = - static_cast<std::atomic<std::size_t> *>(aligned_memory_start); - aligned_memory_start = static_cast<char *>(aligned_memory_start) + - sizeof(std::atomic<std::size_t>) * resized_num_slots; - available_memory -= sizeof(std::atomic<std::size_t>) * resized_num_slots; - - // As in constructor, we will be extra paranoid and use align() to locate the - // start of the array of buckets, as well. - void *resized_buckets = aligned_memory_start; - if (align( - kBucketAlignment, bucket_size_, resized_buckets, available_memory) == - nullptr) { - FATAL_ERROR( - "StorageBlob used to hold resized SeparateChainingHashTable " - "is too small to meet alignment requirements of buckets."); - } else if (resized_buckets != aligned_memory_start) { - DEV_WARNING( - "Bucket array start position adjusted to meet alignment " - "requirement for SeparateChainingHashTable's value type."); - if (resized_num_buckets * bucket_size_ + variable_storage_required > - available_memory) { - --resized_num_buckets; - } - } - aligned_memory_start = static_cast<char *>(aligned_memory_start) + - resized_num_buckets * bucket_size_; - available_memory -= resized_num_buckets * bucket_size_; - - void *resized_variable_length_key_storage = aligned_memory_start; - const std::size_t resized_variable_length_key_storage_size = available_memory; - - const std::size_t original_buckets_used = - header_->buckets_allocated.load(std::memory_order_relaxed); - - // Initialize the header. - resized_header->num_slots = resized_num_slots; - resized_header->num_buckets = resized_num_buckets; - resized_header->buckets_allocated.store(original_buckets_used, - std::memory_order_relaxed); - resized_header->variable_length_bytes_allocated.store( - original_variable_storage_used, std::memory_order_relaxed); - - // Bulk-copy buckets. This is safe because: - // 1. The "next" pointers will be adjusted when rebuilding chains below. - // 2. The hash codes will stay the same. - // 3. For key components: - // a. Inline keys will stay exactly the same. - // b. Offsets into variable-length storage will remain valid, because - // we also do a byte-for-byte copy of variable-length storage below. - // c. Absolute external pointers will still point to the same address. - // d. Relative pointers are not used with resizable hash tables. - // 4. If values are not trivially copyable, then we invoke ValueT's copy - // or move constructor with placement new. - // NOTE(harshad) - Regarding point 4 above, as this is a specialized - // hash table implemented for aggregation, the values are trivially copyable, - // therefore we don't need to invoke payload values' copy/move constructors. - std::memcpy(resized_buckets, buckets_, original_buckets_used * bucket_size_); - - // Copy over variable-length key components, if any. - if (original_variable_storage_used > 0) { - DEBUG_ASSERT(original_variable_storage_used == - key_manager_.getNextVariableLengthKeyOffset()); - DEBUG_ASSERT(original_variable_storage_used <= - resized_variable_length_key_storage_size); - std::memcpy(resized_variable_length_key_storage, - key_manager_.getVariableLengthKeyStorage(), - original_variable_storage_used); - } - - destroyPayload(); - - // Make resized structures active. - std::swap(this->blob_, resized_blob); - header_ = resized_header; - slots_ = resized_slots; - buckets_ = resized_buckets; - key_manager_.setVariableLengthStorageInfo( - resized_variable_length_key_storage, - resized_variable_length_key_storage_size, - &(resized_header->variable_length_bytes_allocated)); - - // Drop the old blob. - const block_id old_blob_id = resized_blob->getID(); - resized_blob.release(); - this->storage_manager_->deleteBlockOrBlobFile(old_blob_id); - - // Rebuild chains. - void *current_bucket = buckets_; - for (std::size_t bucket_num = 0; bucket_num < original_buckets_used; - ++bucket_num) { - std::atomic<std::size_t> *next_ptr = - static_cast<std::atomic<std::size_t> *>(current_bucket); - const std::size_t hash_code = *reinterpret_cast<const std::size_t *>( - static_cast<const char *>(current_bucket) + - sizeof(std::atomic<std::size_t>)); - - const std::size_t slot_number = hash_code % header_->num_slots; - std::size_t slot_ptr_value = 0; - if (slots_[slot_number].compare_exchange_strong( - slot_ptr_value, bucket_num + 1, std::memory_order_relaxed)) { - // This bucket is the first in the chain for this block, so reset its - // next pointer to 0. - next_ptr->store(0, std::memory_order_relaxed); - } else { - // A chain already exists starting from this slot, so put this bucket at - // the head. - next_ptr->store(slot_ptr_value, std::memory_order_relaxed); - slots_[slot_number].store(bucket_num + 1, std::memory_order_relaxed); - } - current_bucket = static_cast<char *>(current_bucket) + bucket_size_; - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/PackedPayloadAggregationStateHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp deleted file mode 100644 index 85d4f8a..0000000 --- a/storage/PackedPayloadAggregationStateHashTable.hpp +++ /dev/null @@ -1,805 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_ -#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_ - -#include <algorithm> -#include <atomic> -#include <cstddef> -#include <cstdlib> -#include <limits> -#include <memory> -#include <type_traits> -#include <utility> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/HashTableBase.hpp" -#include "storage/HashTableKeyManager.hpp" -#include "storage/StorageBlob.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConstants.hpp" -#include "storage/StorageManager.hpp" -#include "storage/TupleReference.hpp" -#include "storage/ValueAccessor.hpp" -#include "storage/ValueAccessorMultiplexer.hpp" -#include "storage/ValueAccessorUtil.hpp" -#include "threading/SpinMutex.hpp" -#include "threading/SpinSharedMutex.hpp" -#include "types/Type.hpp" -#include "types/TypedValue.hpp" -#include "types/containers/ColumnVectorsValueAccessor.hpp" -#include "utility/Alignment.hpp" -#include "utility/HashPair.hpp" -#include "utility/Macros.hpp" -#include "utility/PrimeNumber.hpp" - -namespace quickstep { - -/** \addtogroup Storage - * @{ - */ - -class PackedPayloadSeparateChainingAggregationStateHashTable - : public AggregationStateHashTableBase { - public: - PackedPayloadSeparateChainingAggregationStateHashTable( - const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager); - - ~PackedPayloadSeparateChainingAggregationStateHashTable() override; - - void clear(); - - void destroyPayload() override; - - bool upsertValueAccessor( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessorMultiplexer *accessor_mux) override; - - inline block_id getBlobId() const { - return blob_->getID(); - } - - inline std::size_t numEntries() const { - return header_->buckets_allocated.load(std::memory_order_relaxed); - } - - inline bool upsertCompositeKey(const std::vector<TypedValue> &key, - const std::uint8_t *source_state); - - template <typename FunctorT> - inline bool upsertCompositeKey(const std::vector<TypedValue> &key, - FunctorT *functor, - int index); - - inline const std::uint8_t* getSingleCompositeKey( - const std::vector<TypedValue> &key) const; - - inline const std::uint8_t* getSingleCompositeKey( - const std::vector<TypedValue> &key, - const int index) const; - - template <typename FunctorT> - inline std::size_t forEach(FunctorT *functor) const; - - template <typename FunctorT> - inline std::size_t forEach(FunctorT *functor, const int index) const; - - template <typename FunctorT> - inline std::size_t forEachCompositeKey(FunctorT *functor) const; - - template <typename FunctorT> - inline std::size_t forEachCompositeKey(FunctorT *functor, - const int index) const; - - private: - void resize(const std::size_t extra_buckets, - const std::size_t extra_variable_storage, - const std::size_t retry_num = 0); - - inline std::size_t calculateVariableLengthCompositeKeyCopySize( - const std::vector<TypedValue> &key) const { - std::size_t total = 0; - for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) { - if (!(*key_inline_)[idx]) { - total += key[idx].getDataSize(); - } - } - return total; - } - - inline bool getNextEntry(TypedValue *key, - const std::uint8_t **value, - std::size_t *entry_num) const; - - inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key, - const std::uint8_t **value, - std::size_t *entry_num) const; - - inline std::uint8_t* upsertCompositeKeyInternal( - const std::vector<TypedValue> &key, - const std::size_t variable_key_size); - - template <bool use_two_accessors> - inline bool upsertValueAccessorCompositeKeyInternal( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessor *base_accessor, - ColumnVectorsValueAccessor *derived_accessor); - - // Generate a hash for a composite key by hashing each component of 'key' and - // mixing their bits with CombineHashes(). - inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const; - - // Set information about which key components are stored inline. This usually - // comes from a HashTableKeyManager, and is set by the constructor of a - // subclass of HashTable. - inline void setKeyInline(const std::vector<bool> *key_inline) { - scalar_key_inline_ = key_inline->front(); - key_inline_ = key_inline; - } - - inline static std::size_t ComputeTotalPayloadSize( - const std::vector<AggregationHandle *> &handles) { - std::size_t total_payload_size = sizeof(SpinMutex); - for (const auto *handle : handles) { - total_payload_size += handle->getPayloadSize(); - } - return total_payload_size; - } - - // Assign '*key_vector' with the attribute values specified by 'key_ids' at - // the current position of 'accessor'. If 'check_for_null_keys' is true, stops - // and returns true if any of the values is null, otherwise returns false. - template <bool use_two_accessors, - bool check_for_null_keys, - typename ValueAccessorT> - inline static bool GetCompositeKeyFromValueAccessor( - const std::vector<MultiSourceAttributeId> &key_ids, - const ValueAccessorT *accessor, - const ColumnVectorsValueAccessor *derived_accessor, - std::vector<TypedValue> *key_vector) { - for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) { - const MultiSourceAttributeId &key_id = key_ids[key_idx]; - if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) { - (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id); - } else { - (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id); - } - if (check_for_null_keys && (*key_vector)[key_idx].isNull()) { - return true; - } - } - return false; - } - - struct Header { - std::size_t num_slots; - std::size_t num_buckets; - alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated; - alignas(kCacheLineBytes) - std::atomic<std::size_t> variable_length_bytes_allocated; - }; - - // Type(s) of keys. - const std::vector<const Type *> key_types_; - - // Information about whether key components are stored inline or in a - // separate variable-length storage region. This is usually determined by a - // HashTableKeyManager and set by calling setKeyInline(). - bool scalar_key_inline_; - const std::vector<bool> *key_inline_; - - const std::size_t num_handles_; - const std::vector<AggregationHandle *> handles_; - - std::size_t total_payload_size_; - std::vector<std::size_t> payload_offsets_; - std::uint8_t *init_payload_; - - StorageManager *storage_manager_; - MutableBlobReference blob_; - - // Locked in shared mode for most operations, exclusive mode during resize. - // Not locked at all for non-resizable HashTables. - alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_; - - std::size_t kBucketAlignment; - - // Value's offset in a bucket is the first alignof(ValueT) boundary after the - // next pointer and hash code. - std::size_t kValueOffset; - - // Round bucket size up to a multiple of kBucketAlignment. - constexpr std::size_t ComputeBucketSize(const std::size_t fixed_key_size) { - return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) / - kBucketAlignment) + - 1) * - kBucketAlignment; - } - - // Attempt to find an empty bucket to insert 'hash_code' into, starting after - // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot - // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an - // empty bucket is found. Returns false if 'allow_duplicate_keys' is false - // and a hash collision is found (caller should then check whether there is a - // genuine key collision or the hash collision is spurious). Returns false - // and sets '*bucket' to NULL if there are no more empty buckets in the hash - // table. If 'variable_key_allocation_required' is nonzero, this method will - // attempt to allocate storage for a variable-length key BEFORE allocating a - // bucket, so that no bucket number below 'header_->num_buckets' is ever - // deallocated after being allocated. - inline bool locateBucketForInsertion( - const std::size_t hash_code, - const std::size_t variable_key_allocation_required, - void **bucket, - std::atomic<std::size_t> **pending_chain_ptr, - std::size_t *pending_chain_ptr_finish_value); - - // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was - // found by locateBucketForInsertion(). Assumes that storage for a - // variable-length key copy (if any) was already allocated by a successful - // call to allocateVariableLengthKeyStorage(). - inline void writeScalarKeyToBucket( - const TypedValue &key, - const std::size_t hash_code, - void *bucket); - - // Write a composite 'key' and its 'hash_code' into the '*bucket', which was - // found by locateBucketForInsertion(). Assumes that storage for - // variable-length key copies (if any) was already allocated by a successful - // call to allocateVariableLengthKeyStorage(). - inline void writeCompositeKeyToBucket( - const std::vector<TypedValue> &key, - const std::size_t hash_code, - void *bucket); - - // Determine whether it is actually necessary to resize this hash table. - // Checks that there is at least one unallocated bucket, and that there is - // at least 'extra_variable_storage' bytes of variable-length storage free. - inline bool isFull(const std::size_t extra_variable_storage) const; - - // Helper object to manage key storage. - HashTableKeyManager<false, true> key_manager_; - - // In-memory structure is as follows: - // - SeparateChainingHashTable::Header - // - Array of slots, interpreted as follows: - // - 0 = Points to nothing (empty) - // - SIZE_T_MAX = Pending (some thread is starting a chain from this - // slot and will overwrite it soon) - // - Anything else = The number of the first bucket in the chain for - // this slot PLUS ONE (i.e. subtract one to get the actual bucket - // number). - // - Array of buckets, each of which is: - // - atomic size_t "next" pointer, interpreted the same as slots above. - // - size_t hash value - // - possibly some unused bytes as needed so that ValueT's alignment - // requirement is met - // - ValueT value slot - // - fixed-length key storage (which may include pointers to external - // memory or offsets of variable length keys stored within this hash - // table) - // - possibly some additional unused bytes so that bucket size is a - // multiple of both alignof(std::atomic<std::size_t>) and - // alignof(ValueT) - // - Variable-length key storage region (referenced by offsets stored in - // fixed-length keys). - Header *header_; - - std::atomic<std::size_t> *slots_; - void *buckets_; - const std::size_t bucket_size_; - - DISALLOW_COPY_AND_ASSIGN(PackedPayloadSeparateChainingAggregationStateHashTable); -}; - -/** @} */ - -// ---------------------------------------------------------------------------- -// Implementations of template class methods follow. - -class HashTableMerger { - public: - /** - * @brief Constructor - * - * @param handle The Aggregation handle being used. - * @param destination_hash_table The destination hash table to which other - * hash tables will be merged. - **/ - explicit HashTableMerger( - AggregationStateHashTableBase *destination_hash_table) - : destination_hash_table_( - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>( - destination_hash_table)) {} - - /** - * @brief The operator for the functor. - * - * @param group_by_key The group by key being merged. - * @param source_state The aggregation state for the given key in the source - * aggregation hash table. - **/ - inline void operator()(const std::vector<TypedValue> &group_by_key, - const std::uint8_t *source_state) { - destination_hash_table_->upsertCompositeKey(group_by_key, source_state); - } - - private: - PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_; - - DISALLOW_COPY_AND_ASSIGN(HashTableMerger); -}; - -inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable - ::hashCompositeKey(const std::vector<TypedValue> &key) const { - DEBUG_ASSERT(!key.empty()); - DEBUG_ASSERT(key.size() == key_types_.size()); - std::size_t hash = key.front().getHash(); - for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1; - key_it != key.end(); - ++key_it) { - hash = CombineHashes(hash, key_it->getHash()); - } - return hash; -} - -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::getNextEntry(TypedValue *key, - const std::uint8_t **value, - std::size_t *entry_num) const { - if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) { - const char *bucket = - static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_; - *key = key_manager_.getKeyComponentTyped(bucket, 0); - *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset); - ++(*entry_num); - return true; - } else { - return false; - } -} - - -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::getNextEntryCompositeKey(std::vector<TypedValue> *key, - const std::uint8_t **value, - std::size_t *entry_num) const { - if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) { - const char *bucket = - static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_; - for (std::vector<const Type *>::size_type key_idx = 0; - key_idx < this->key_types_.size(); - ++key_idx) { - key->emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx)); - } - *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset); - ++(*entry_num); - return true; - } else { - return false; - } -} - - -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::locateBucketForInsertion(const std::size_t hash_code, - const std::size_t variable_key_allocation_required, - void **bucket, - std::atomic<std::size_t> **pending_chain_ptr, - std::size_t *pending_chain_ptr_finish_value) { - if (*bucket == nullptr) { - *pending_chain_ptr = &(slots_[hash_code % header_->num_slots]); - } else { - *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket); - } - for (;;) { - std::size_t existing_chain_ptr = 0; - if ((*pending_chain_ptr) - ->compare_exchange_strong(existing_chain_ptr, - std::numeric_limits<std::size_t>::max(), - std::memory_order_acq_rel)) { - // Got to the end of the chain. Allocate a new bucket. - - // First, allocate variable-length key storage, if needed (i.e. if this - // is an upsert and we didn't allocate up-front). - if (!key_manager_.allocateVariableLengthKeyStorage( - variable_key_allocation_required)) { - // Ran out of variable-length storage. - (*pending_chain_ptr)->store(0, std::memory_order_release); - *bucket = nullptr; - return false; - } - - const std::size_t allocated_bucket_num = - header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed); - if (allocated_bucket_num >= header_->num_buckets) { - // Ran out of buckets. - header_->buckets_allocated.fetch_sub(1, std::memory_order_relaxed); - (*pending_chain_ptr)->store(0, std::memory_order_release); - *bucket = nullptr; - return false; - } else { - *bucket = - static_cast<char *>(buckets_) + allocated_bucket_num * bucket_size_; - *pending_chain_ptr_finish_value = allocated_bucket_num + 1; - return true; - } - } - // Spin until the real "next" pointer is available. - while (existing_chain_ptr == std::numeric_limits<std::size_t>::max()) { - existing_chain_ptr = - (*pending_chain_ptr)->load(std::memory_order_acquire); - } - if (existing_chain_ptr == 0) { - // Other thread had to roll back, so try again. - continue; - } - // Chase the next pointer. - *bucket = - static_cast<char *>(buckets_) + (existing_chain_ptr - 1) * bucket_size_; - *pending_chain_ptr = static_cast<std::atomic<std::size_t> *>(*bucket); - const std::size_t hash_in_bucket = *reinterpret_cast<const std::size_t *>( - static_cast<const char *>(*bucket) + - sizeof(std::atomic<std::size_t>)); - if (hash_in_bucket == hash_code) { - return false; - } - } -} - -inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable - ::getSingleCompositeKey(const std::vector<TypedValue> &key) const { - DEBUG_ASSERT(this->key_types_.size() == key.size()); - - const std::size_t hash_code = this->hashCompositeKey(key); - std::size_t bucket_ref = - slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed); - while (bucket_ref != 0) { - DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max()); - const char *bucket = - static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_; - const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>( - bucket + sizeof(std::atomic<std::size_t>)); - if ((bucket_hash == hash_code) && - key_manager_.compositeKeyCollisionCheck(key, bucket)) { - // Match located. - return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset); - } - bucket_ref = - reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load( - std::memory_order_relaxed); - } - - // Reached the end of the chain and didn't find a match. - return nullptr; -} - -inline const std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable - ::getSingleCompositeKey(const std::vector<TypedValue> &key, - const int index) const { - DEBUG_ASSERT(this->key_types_.size() == key.size()); - - const std::size_t hash_code = this->hashCompositeKey(key); - std::size_t bucket_ref = - slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed); - while (bucket_ref != 0) { - DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max()); - const char *bucket = - static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_; - const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>( - bucket + sizeof(std::atomic<std::size_t>)); - if ((bucket_hash == hash_code) && - key_manager_.compositeKeyCollisionCheck(key, bucket)) { - // Match located. - return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) + - this->payload_offsets_[index]; - } - bucket_ref = - reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load( - std::memory_order_relaxed); - } - - // Reached the end of the chain and didn't find a match. - return nullptr; -} - -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::upsertCompositeKey(const std::vector<TypedValue> &key, - const std::uint8_t *source_state) { - const std::size_t variable_size = - calculateVariableLengthCompositeKeyCopySize(key); - for (;;) { - { - SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_); - std::uint8_t *value = - upsertCompositeKeyInternal(key, variable_size); - if (value != nullptr) { - SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value))); - for (unsigned int k = 0; k < num_handles_; ++k) { - handles_[k]->mergeStates(source_state + payload_offsets_[k], - value + payload_offsets_[k]); - } - return true; - } - } - resize(0, variable_size); - } -} - -template <typename FunctorT> -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::upsertCompositeKey(const std::vector<TypedValue> &key, - FunctorT *functor, - int index) { - const std::size_t variable_size = - calculateVariableLengthCompositeKeyCopySize(key); - for (;;) { - { - SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_); - std::uint8_t *value = - upsertCompositeKeyInternal(key, variable_size); - if (value != nullptr) { - (*functor)(value + payload_offsets_[index]); - return true; - } - } - resize(0, variable_size); - } -} - - -inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable - ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key, - const std::size_t variable_key_size) { - if (variable_key_size > 0) { - // Don't allocate yet, since the key may already be present. However, we - // do check if either the allocated variable storage space OR the free - // space is big enough to hold the key (at least one must be true: either - // the key is already present and allocated, or we need to be able to - // allocate enough space for it). - std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load( - std::memory_order_relaxed); - if ((allocated_bytes < variable_key_size) && - (allocated_bytes + variable_key_size > - key_manager_.getVariableLengthKeyStorageSize())) { - return nullptr; - } - } - - const std::size_t hash_code = this->hashCompositeKey(key); - void *bucket = nullptr; - std::atomic<std::size_t> *pending_chain_ptr; - std::size_t pending_chain_ptr_finish_value; - for (;;) { - if (locateBucketForInsertion(hash_code, - variable_key_size, - &bucket, - &pending_chain_ptr, - &pending_chain_ptr_finish_value)) { - // Found an empty bucket. - break; - } else if (bucket == nullptr) { - // Ran out of buckets or variable-key space. - return nullptr; - } else if (key_manager_.compositeKeyCollisionCheck(key, bucket)) { - // Found an already-existing entry for this key. - return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) + - kValueOffset); - } - } - - // We are now writing to an empty bucket. - // Write the key and hash. - writeCompositeKeyToBucket(key, hash_code, bucket); - - std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset; - std::memcpy(value, init_payload_, this->total_payload_size_); - - // Update the previous chaing pointer to point to the new bucket. - pending_chain_ptr->store(pending_chain_ptr_finish_value, - std::memory_order_release); - - // Return the value. - return value; -} - -template <bool use_two_accessors> -inline bool PackedPayloadSeparateChainingAggregationStateHashTable - ::upsertValueAccessorCompositeKeyInternal( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessor *base_accessor, - ColumnVectorsValueAccessor *derived_accessor) { - std::size_t variable_size; - std::vector<TypedValue> key_vector; - key_vector.resize(key_ids.size()); - - return InvokeOnAnyValueAccessor( - base_accessor, - [&](auto *accessor) -> bool { // NOLINT(build/c++11) - bool continuing = true; - while (continuing) { - { - continuing = false; - SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_); - while (accessor->next()) { - if (use_two_accessors) { - derived_accessor->next(); - } - if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>( - key_ids, - accessor, - derived_accessor, - &key_vector)) { - continue; - } - variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector); - std::uint8_t *value = this->upsertCompositeKeyInternal( - key_vector, variable_size); - if (value == nullptr) { - continuing = true; - break; - } else { - SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value))); - for (unsigned int k = 0; k < num_handles_; ++k) { - const auto &ids = argument_ids[k]; - if (ids.empty()) { - handles_[k]->updateStateNullary(value + payload_offsets_[k]); - } else { - const MultiSourceAttributeId &arg_id = ids.front(); - if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) { - DCHECK_NE(arg_id.attr_id, kInvalidAttributeID); - handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id), - value + payload_offsets_[k]); - } else { - handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id), - value + payload_offsets_[k]); - } - } - } - } - } - } - if (continuing) { - this->resize(0, variable_size); - accessor->previous(); - if (use_two_accessors) { - derived_accessor->previous(); - } - } - } - return true; - }); -} - -inline void PackedPayloadSeparateChainingAggregationStateHashTable - ::writeScalarKeyToBucket(const TypedValue &key, - const std::size_t hash_code, - void *bucket) { - *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) + - sizeof(std::atomic<std::size_t>)) = - hash_code; - key_manager_.writeKeyComponentToBucket(key, 0, bucket, nullptr); -} - -inline void PackedPayloadSeparateChainingAggregationStateHashTable - ::writeCompositeKeyToBucket(const std::vector<TypedValue> &key, - const std::size_t hash_code, - void *bucket) { - DEBUG_ASSERT(key.size() == this->key_types_.size()); - *reinterpret_cast<std::size_t *>(static_cast<char *>(bucket) + - sizeof(std::atomic<std::size_t>)) = - hash_code; - for (std::size_t idx = 0; idx < this->key_types_.size(); ++idx) { - key_manager_.writeKeyComponentToBucket(key[idx], idx, bucket, nullptr); - } -} - -inline bool PackedPayloadSeparateChainingAggregationStateHashTable::isFull( - const std::size_t extra_variable_storage) const { - if (header_->buckets_allocated.load(std::memory_order_relaxed) >= - header_->num_buckets) { - // All buckets are allocated. - return true; - } - - if (extra_variable_storage > 0) { - if (extra_variable_storage + - header_->variable_length_bytes_allocated.load( - std::memory_order_relaxed) > - key_manager_.getVariableLengthKeyStorageSize()) { - // Not enough variable-length key storage space. - return true; - } - } - - return false; -} - -template <typename FunctorT> -inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable - ::forEach(FunctorT *functor) const { - std::size_t entries_visited = 0; - std::size_t entry_num = 0; - TypedValue key; - const std::uint8_t *value_ptr; - while (getNextEntry(&key, &value_ptr, &entry_num)) { - ++entries_visited; - (*functor)(key, value_ptr); - } - return entries_visited; -} - -template <typename FunctorT> -inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable - ::forEach(FunctorT *functor, const int index) const { - std::size_t entries_visited = 0; - std::size_t entry_num = 0; - TypedValue key; - const std::uint8_t *value_ptr; - while (getNextEntry(&key, &value_ptr, &entry_num)) { - ++entries_visited; - (*functor)(key, value_ptr + payload_offsets_[index]); - key.clear(); - } - return entries_visited; -} - -template <typename FunctorT> -inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable - ::forEachCompositeKey(FunctorT *functor) const { - std::size_t entries_visited = 0; - std::size_t entry_num = 0; - std::vector<TypedValue> key; - const std::uint8_t *value_ptr; - while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) { - ++entries_visited; - (*functor)(key, value_ptr); - key.clear(); - } - return entries_visited; -} - -template <typename FunctorT> -inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable - ::forEachCompositeKey(FunctorT *functor, - const int index) const { - std::size_t entries_visited = 0; - std::size_t entry_num = 0; - std::vector<TypedValue> key; - const std::uint8_t *value_ptr; - while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) { - ++entries_visited; - (*functor)(key, value_ptr + payload_offsets_[index]); - key.clear(); - } - return entries_visited; -} - -} // namespace quickstep - -#endif // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_