Updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e7a92a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e7a92a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e7a92a9 Branch: refs/heads/collision-free-agg Commit: 1e7a92a94e0076d89151ea4a2ab4f68caa0572c0 Parents: 3bcb5c8 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Fri Feb 3 22:44:37 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Fri Feb 3 22:44:37 2017 -0600 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 4 +- .../aggregation/AggregationConcreteHandle.hpp | 21 +- expressions/aggregation/CMakeLists.txt | 2 +- storage/AggregationOperationState.cpp | 31 +- storage/CMakeLists.txt | 24 +- .../CollisionFreeAggregationStateHashTable.cpp | 285 ------- .../CollisionFreeAggregationStateHashTable.hpp | 621 -------------- storage/CollisionFreeVectorTable.cpp | 283 +++++++ storage/CollisionFreeVectorTable.hpp | 621 ++++++++++++++ storage/HashTableFactory.hpp | 8 +- .../PackedPayloadAggregationStateHashTable.cpp | 439 ---------- .../PackedPayloadAggregationStateHashTable.hpp | 805 ------------------- storage/PackedPayloadHashTable.cpp | 436 ++++++++++ storage/PackedPayloadHashTable.hpp | 798 ++++++++++++++++++ 14 files changed, 2180 insertions(+), 2198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index 5fd7e0f..bbce29f 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -24,7 +24,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "storage/HashTableFactory.hpp" -#include "storage/PackedPayloadAggregationStateHashTable.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/ValueAccessorMultiplexer.hpp" namespace quickstep { @@ -57,7 +57,7 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( concatenated_ids.emplace_back(arg_id); } - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(distinctify_hash_table) + static_cast<PackedPayloadHashTable *>(distinctify_hash_table) ->upsertValueAccessor({}, concatenated_ids, accessor_mux); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index 5b49d0d..c8d61ff 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -29,7 +29,7 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/HashTableBase.hpp" -#include "storage/PackedPayloadAggregationStateHashTable.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/TypedValue.hpp" @@ -151,7 +151,7 @@ class AggregationConcreteHandle : public AggregationHandle { const std::size_t index, const std::vector<TypedValue> &group_key) const { const std::uint8_t *group_state = - static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table) + static_cast<const PackedPayloadHashTable &>(hash_table) .getSingleCompositeKey(group_key, index); DCHECK(group_state != nullptr) << "Could not find entry for specified group_key in HashTable"; @@ -217,8 +217,7 @@ StateT* AggregationConcreteHandle:: }; const auto &hash_table = - static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>( - distinctify_hash_table); + static_cast<const PackedPayloadHashTable &>(distinctify_hash_table); // Invoke the lambda function "aggregate_functor" on each key from the // distinctify hash table. hash_table.forEach(&aggregate_functor); @@ -233,9 +232,8 @@ void AggregationConcreteHandle:: const std::size_t index, AggregationStateHashTableBase *aggregation_hash_table) const { const HandleT &handle = static_cast<const HandleT &>(*this); - PackedPayloadSeparateChainingAggregationStateHashTable *target_hash_table = - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>( - aggregation_hash_table); + PackedPayloadHashTable *target_hash_table = + static_cast<PackedPayloadHashTable *>(aggregation_hash_table); // A lambda function which will be called on each key-value pair from the // distinctify hash table. @@ -256,9 +254,8 @@ void AggregationConcreteHandle:: target_hash_table->upsertCompositeKey(key, &upserter, index); }; - const PackedPayloadSeparateChainingAggregationStateHashTable &source_hash_table = - static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>( - distinctify_hash_table); + const PackedPayloadHashTable &source_hash_table = + static_cast<const PackedPayloadHashTable &>(distinctify_hash_table); // Invoke the lambda function "aggregate_functor" on each composite key vector // from the distinctify hash table. source_hash_table.forEachCompositeKey(&aggregate_functor); @@ -271,8 +268,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( const std::size_t index, std::vector<std::vector<TypedValue>> *group_by_keys) const { const HandleT &handle = static_cast<const HandleT &>(*this); - const PackedPayloadSeparateChainingAggregationStateHashTable &hash_table_concrete = - static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table); + const PackedPayloadHashTable &hash_table_concrete = + static_cast<const PackedPayloadHashTable &>(hash_table); if (group_by_keys->empty()) { if (NativeColumnVector::UsableForType(result_type)) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 0816db3..7203c8c 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -146,7 +146,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase quickstep_storage_HashTableFactory - quickstep_storage_PackedPayloadAggregationStateHashTable + quickstep_storage_PackedPayloadHashTable quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_TypedValue http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index a393185..4ffd418 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -38,10 +38,11 @@ #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "storage/AggregationOperationState.pb.h" +#include "storage/CollisionFreeVectorTable.hpp" #include "storage/HashTableFactory.hpp" #include "storage/HashTableBase.hpp" #include "storage/InsertDestination.hpp" -#include "storage/PackedPayloadAggregationStateHashTable.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" @@ -353,7 +354,7 @@ bool AggregationOperationState::ProtoIsValid( std::size_t AggregationOperationState::getNumPartitions() const { if (is_aggregate_collision_free_) { - return static_cast<CollisionFreeAggregationStateHashTable *>( + return static_cast<CollisionFreeVectorTable *>( collision_free_hashtable_.get())->getNumFinalizationPartitions(); } else if (is_aggregate_partitioned_) { return partitioned_group_by_hashtable_pool_->getNumPartitions(); @@ -364,7 +365,7 @@ std::size_t AggregationOperationState::getNumPartitions() const { std::size_t AggregationOperationState::getNumInitializationPartitions() const { if (is_aggregate_collision_free_) { - return static_cast<CollisionFreeAggregationStateHashTable *>( + return static_cast<CollisionFreeVectorTable *>( collision_free_hashtable_.get())->getNumInitializationPartitions(); } else { return 0u; @@ -373,7 +374,7 @@ std::size_t AggregationOperationState::getNumInitializationPartitions() const { void AggregationOperationState::initializeState(const std::size_t partition_id) { if (is_aggregate_collision_free_) { - static_cast<CollisionFreeAggregationStateHashTable *>( + static_cast<CollisionFreeVectorTable *>( collision_free_hashtable_.get())->initialize(partition_id); } else { LOG(FATAL) << "AggregationOperationState::initializeState() " @@ -512,10 +513,10 @@ void AggregationOperationState::mergeSingleState( } void AggregationOperationState::mergeGroupByHashTables( - AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const { - HashTableMerger merger(dst); - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src) - ->forEachCompositeKey(&merger); + AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst) const { + HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst)); + static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger); } void AggregationOperationState::aggregateBlockHashTable( @@ -661,9 +662,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree( const std::size_t partition_id, InsertDestination *output_destination) { std::vector<std::unique_ptr<ColumnVector>> final_values; - CollisionFreeAggregationStateHashTable *hash_table = - static_cast<CollisionFreeAggregationStateHashTable *>( - collision_free_hashtable_.get()); + CollisionFreeVectorTable *hash_table = + static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get()); // TODO const std::size_t max_length = @@ -696,8 +696,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree( void AggregationOperationState::finalizeHashTableImplPartitioned( const std::size_t partition_id, InsertDestination *output_destination) { - PackedPayloadSeparateChainingAggregationStateHashTable *hash_table = - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>( + PackedPayloadHashTable *hash_table = + static_cast<PackedPayloadHashTable *>( partitioned_group_by_hashtable_pool_->getHashTable(partition_id)); // Each element of 'group_by_keys' is a vector of values for a particular @@ -790,9 +790,8 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate( hash_table->destroyPayload(); } - PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table = - static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>( - final_hash_table_ptr.get()); + PackedPayloadHashTable *final_hash_table = + static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get()); // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index d43d7a2..8fbb4ea 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -165,9 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING) bitweaving/BitWeavingVIndexSubBlock.hpp) endif() # CMAKE_VALIDATE_IGNORE_END -add_library(quickstep_storage_CollisionFreeAggregationStateHashTable - CollisionFreeAggregationStateHashTable.cpp - CollisionFreeAggregationStateHashTable.hpp) +add_library(quickstep_storage_CollisionFreeVectorTable + CollisionFreeVectorTable.cpp + CollisionFreeVectorTable.hpp) add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp) add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp) add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -225,9 +225,7 @@ add_library(quickstep_storage_InsertDestination_proto add_library(quickstep_storage_LinearOpenAddressingHashTable ../empty_src.cpp LinearOpenAddressingHashTable.hpp) -add_library(quickstep_storage_PackedPayloadAggregationStateHashTable - PackedPayloadAggregationStateHashTable.cpp - PackedPayloadAggregationStateHashTable.hpp) +add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp) add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp) add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp) add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp) @@ -284,7 +282,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_HashTablePool quickstep_storage_InsertDestination quickstep_storage_PartitionedHashTablePool - quickstep_storage_PackedPayloadAggregationStateHashTable + quickstep_storage_PackedPayloadHashTable quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager @@ -435,7 +433,7 @@ if(QUICKSTEP_HAVE_BITWEAVING) quickstep_utility_Macros) endif() # CMAKE_VALIDATE_IGNORE_END -target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable +target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationID @@ -714,12 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_storage_HashTableFactory glog - quickstep_storage_CollisionFreeAggregationStateHashTable + quickstep_storage_CollisionFreeVectorTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase quickstep_storage_LinearOpenAddressingHashTable - quickstep_storage_PackedPayloadAggregationStateHashTable + quickstep_storage_PackedPayloadHashTable quickstep_storage_SeparateChainingHashTable quickstep_storage_SimpleScalarSeparateChainingHashTable quickstep_storage_TupleReference @@ -798,7 +796,7 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable quickstep_utility_Alignment quickstep_utility_Macros quickstep_utility_PrimeNumber) -target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable +target_link_libraries(quickstep_storage_PackedPayloadHashTable quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle quickstep_storage_HashTableBase @@ -1115,7 +1113,7 @@ target_link_libraries(quickstep_storage quickstep_storage_BasicColumnStoreValueAccessor quickstep_storage_BloomFilterIndexSubBlock quickstep_storage_CSBTreeIndexSubBlock - quickstep_storage_CollisionFreeAggregationStateHashTable + quickstep_storage_CollisionFreeVectorTable quickstep_storage_ColumnStoreUtil quickstep_storage_CompressedBlockBuilder quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -1141,7 +1139,7 @@ target_link_libraries(quickstep_storage quickstep_storage_InsertDestination_proto quickstep_storage_LinearOpenAddressingHashTable quickstep_storage_PartitionedHashTablePool - quickstep_storage_PackedPayloadAggregationStateHashTable + quickstep_storage_PackedPayloadHashTable quickstep_storage_PreloaderThread quickstep_storage_SMAIndexSubBlock quickstep_storage_SeparateChainingHashTable http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp deleted file mode 100644 index 2f3b336..0000000 --- a/storage/CollisionFreeAggregationStateHashTable.cpp +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "storage/CollisionFreeAggregationStateHashTable.hpp" - -#include <algorithm> -#include <atomic> -#include <cstddef> -#include <cstdint> -#include <cstdlib> -#include <map> -#include <memory> -#include <vector> - -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageManager.hpp" -#include "storage/ValueAccessor.hpp" -#include "storage/ValueAccessorMultiplexer.hpp" -#include "storage/ValueAccessorUtil.hpp" -#include "types/containers/ColumnVectorsValueAccessor.hpp" -#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" - -namespace quickstep { - -CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable( - const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager) - : key_type_(key_types.front()), - num_entries_(num_entries), - num_handles_(handles.size()), - handles_(handles), - num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)), - storage_manager_(storage_manager) { - CHECK_EQ(1u, key_types.size()); - DCHECK_GT(num_entries, 0u); - - std::map<std::string, std::size_t> memory_offsets; - std::size_t required_memory = 0; - - memory_offsets.emplace("existence_map", required_memory); - required_memory += CacheLineAlignedBytes( - BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); - - for (std::size_t i = 0; i < num_handles_; ++i) { - const AggregationHandle *handle = handles_[i]; - const std::vector<const Type *> argument_types = handle->getArgumentTypes(); - - std::size_t state_size = 0; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - state_size = sizeof(std::atomic<std::size_t>); - break; - } - case AggregationID::kSum: { - CHECK_EQ(1u, argument_types.size()); - switch (argument_types.front()->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - state_size = sizeof(std::atomic<std::int64_t>); - break; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - state_size = sizeof(std::atomic<double>); - break; - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - - memory_offsets.emplace(std::string("state") + std::to_string(i), - required_memory); - required_memory += CacheLineAlignedBytes(state_size * num_entries); - } - - const std::size_t num_storage_slots = - storage_manager_->SlotsNeededForBytes(required_memory); - - const block_id blob_id = storage_manager_->createBlob(num_storage_slots); - blob_ = storage_manager_->getBlobMutable(blob_id); - - void *memory_start = blob_->getMemoryMutable(); - existence_map_.reset(new BarrieredReadWriteConcurrentBitVector( - reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"), - num_entries, - false /* initialize */)); - - for (std::size_t i = 0; i < num_handles_; ++i) { - vec_tables_.emplace_back( - reinterpret_cast<char *>(memory_start) + - memory_offsets.at(std::string("state") + std::to_string(i))); - } - - memory_size_ = required_memory; - num_init_partitions_ = - std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL)); -} - -CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() { - const block_id blob_id = blob_->getID(); - blob_.release(); - storage_manager_->deleteBlockOrBlobFile(blob_id); -} - -void CollisionFreeAggregationStateHashTable::destroyPayload() { -} - -bool CollisionFreeAggregationStateHashTable::upsertValueAccessor( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessorMultiplexer *accessor_mux) { - DCHECK_EQ(1u, key_ids.size()); - - const ValueAccessorSource key_source = key_ids.front().source; - const attribute_id key_id = key_ids.front().attr_id; - const bool is_key_nullable = key_type_->isNullable(); - - if (handles_.empty()) { - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor_mux->getValueAccessorBySource(key_source), - [&](auto *accessor) -> void { // NOLINT(build/c++11) - upsertValueAccessorKeyOnlyHelper(is_key_nullable, - key_type_, - key_id, - accessor); - }); - return true; - } - - DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType() - == ValueAccessor::Implementation::kColumnVectors); - ValueAccessor *base_accessor = accessor_mux->getBaseAccessor(); - ColumnVectorsValueAccessor *derived_accesor = - static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor()); - - for (std::size_t i = 0; i < num_handles_; ++i) { - DCHECK_LE(argument_ids[i].size(), 1u); - - const AggregationHandle *handle = handles_[i]; - const auto &argument_types = handle->getArgumentTypes(); - const auto &argument_ids_i = argument_ids[i]; - - ValueAccessorSource argument_source; - attribute_id argument_id; - const Type *argument_type; - bool is_argument_nullable; - - if (argument_ids_i.empty()) { - argument_source = ValueAccessorSource::kInvalid; - argument_id = kInvalidAttributeID; - - DCHECK(argument_types.empty()); - argument_type = nullptr; - is_argument_nullable = false; - } else { - DCHECK_EQ(1u, argument_ids_i.size()); - argument_source = argument_ids_i.front().source; - argument_id = argument_ids_i.front().attr_id; - - DCHECK_EQ(1u, argument_types.size()); - argument_type = argument_types.front(); - is_argument_nullable = argument_type->isNullable(); - } - - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - base_accessor, - [&](auto *accessor) -> void { // NOLINT(build/c++11) - if (key_source == ValueAccessorSource::kBase) { - if (argument_source == ValueAccessorSource::kBase) { - upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - accessor); - } else { - upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - derived_accesor); - } - } else { - if (argument_source == ValueAccessorSource::kBase) { - upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - accessor); - } else { - upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - derived_accesor); - } - } - }); - } - return true; -} - -void CollisionFreeAggregationStateHashTable::finalizeKey( - const std::size_t partition_id, - NativeColumnVector *output_cv) const { - const std::size_t start_position = - calculatePartitionStartPosition(partition_id); - const std::size_t end_position = - calculatePartitionEndPosition(partition_id); - - switch (key_type_->getTypeID()) { - case TypeID::kInt: - finalizeKeyInternal<int>(start_position, end_position, output_cv); - return; - case TypeID::kLong: - finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -void CollisionFreeAggregationStateHashTable::finalizeState( - const std::size_t partition_id, - std::size_t handle_id, - NativeColumnVector *output_cv) const { - const std::size_t start_position = - calculatePartitionStartPosition(partition_id); - const std::size_t end_position = - calculatePartitionEndPosition(partition_id); - - const AggregationHandle *handle = handles_[handle_id]; - const auto &argument_types = handle->getArgumentTypes(); - const Type *argument_type = - argument_types.empty() ? nullptr : argument_types.front(); - - finalizeStateDispatchHelper(handle->getAggregationID(), - argument_type, - vec_tables_[handle_id], - start_position, - end_position, - output_cv); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp deleted file mode 100644 index d738e4e..0000000 --- a/storage/CollisionFreeAggregationStateHashTable.hpp +++ /dev/null @@ -1,621 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ -#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ - -#include <atomic> -#include <cstddef> -#include <cstdint> -#include <cstring> -#include <memory> -#include <type_traits> -#include <utility> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationID.hpp" -#include "storage/HashTableBase.hpp" -#include "storage/StorageBlob.hpp" -#include "storage/StorageConstants.hpp" -#include "storage/ValueAccessor.hpp" -#include "storage/ValueAccessorMultiplexer.hpp" -#include "types/Type.hpp" -#include "types/TypeID.hpp" -#include "types/TypedValue.hpp" -#include "types/containers/ColumnVector.hpp" -#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -class ColumnVectorsValueAccessor; -class StorageMnager; - -/** \addtogroup Storage - * @{ - */ - -class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableBase { - public: - CollisionFreeAggregationStateHashTable( - const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager); - - ~CollisionFreeAggregationStateHashTable() override; - - void destroyPayload() override; - - inline std::size_t getNumInitializationPartitions() const { - return num_init_partitions_; - } - - inline std::size_t getNumFinalizationPartitions() const { - return num_finalize_partitions_; - } - - inline std::size_t getNumTuplesInPartition( - const std::size_t partition_id) const { - const std::size_t start_position = - calculatePartitionStartPosition(partition_id); - const std::size_t end_position = - calculatePartitionEndPosition(partition_id); - return existence_map_->onesCountInRange(start_position, end_position); - } - - inline void initialize(const std::size_t partition_id) { - const std::size_t memory_segment_size = - (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_; - const std::size_t memory_start = memory_segment_size * partition_id; - std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start, - 0, - std::min(memory_segment_size, memory_size_ - memory_start)); - } - - bool upsertValueAccessor( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessorMultiplexer *accessor_mux) override; - - void finalizeKey(const std::size_t partition_id, - NativeColumnVector *output_cv) const; - - void finalizeState(const std::size_t partition_id, - std::size_t handle_id, - NativeColumnVector *output_cv) const; - - private: - inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) { - return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes; - } - - inline std::size_t calculatePartitionLength() const { - const std::size_t partition_length = - (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_; - DCHECK_GE(partition_length, 0u); - return partition_length; - } - - inline std::size_t calculatePartitionStartPosition( - const std::size_t partition_id) const { - return calculatePartitionLength() * partition_id; - } - - inline std::size_t calculatePartitionEndPosition( - const std::size_t partition_id) const { - return std::min(calculatePartitionLength() * (partition_id + 1), - num_entries_); - } - - template <bool use_two_accessors, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args); - - template <bool ...bool_values, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *key_type, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountHelper( - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorSumHelper( - const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename ...ArgTypes> - inline void upsertValueAccessorKeyOnlyHelper( - const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorKeyOnly( - const attribute_id key_attr_id, - KeyValueAccessorT *key_accessor); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorCountNullary( - const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor); - - template <bool use_two_accessors, bool is_key_nullable, typename KeyT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountUnary( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorIntegerSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorGenericSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename KeyT> - inline void finalizeKeyInternal(const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; - } - } - - template <typename ...ArgTypes> - inline void finalizeStateDispatchHelper(const AggregationID agg_id, - const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const { - switch (agg_id) { - case AggregationID::kCount: - finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - finalizeStateSumHelper(argument_type, - vec_table, - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } - } - - template <typename ...ArgTypes> - inline void finalizeStateSumHelper(const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const { - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - finalizeStateSum<std::int64_t>( - static_cast<const std::atomic<std::int64_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - finalizeStateSum<double>( - static_cast<const std::atomic<double> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } - } - - inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } - } - - template <typename ResultT, typename StateT> - inline void finalizeStateSum(const std::atomic<StateT> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } - } - - const Type *key_type_; - const std::size_t num_entries_; - - const std::size_t num_handles_; - const std::vector<AggregationHandle *> handles_; - - std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_; - std::vector<void *> vec_tables_; - - const std::size_t num_finalize_partitions_; - - StorageManager *storage_manager_; - MutableBlobReference blob_; - - std::size_t memory_size_; - std::size_t num_init_partitions_; - - DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable); -}; - -// ---------------------------------------------------------------------------- -// Implementations of template methods follow. - -template <bool use_two_accessors, typename ...ArgTypes> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorDispatchHelper(const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args) { - if (is_key_nullable) { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, true, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, true, false>( - std::forward<ArgTypes>(args)...); - } - } else { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, false, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, false, false>( - std::forward<ArgTypes>(args)...); - } - } -} - -template <bool ...bool_values, typename ...ArgTypes> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorDispatchHelper(const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorDispatchHelper<bool_values..., int>( - std::forward<ArgTypes>(args)...); - return; - case TypeID::kLong: - upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>( - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorDispatchHelper(const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args) { - switch (agg_id) { - case AggregationID::kCount: - upsertValueAccessorCountHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - upsertValueAccessorSumHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - argument_type, std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename ...ArgTypes> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...); - } - return; - } - case TypeID::kLong: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...); - } - return; - } - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - existence_map_->setBit(*key); - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorCountHelper(const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0u); - - if (is_argument_nullable && argument_id != kInvalidAttributeID) { - upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - } else { - upsertValueAccessorCountNullary<is_key_nullable, KeyT>( - key_attr_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor); - return; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorSumHelper(const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0u); - DCHECK_GE(argument_id, 0u); - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kLong: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kFloat: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kDouble: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorCountNullary(const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - existence_map_->setBit(loc); - } -} - -template <bool use_two_accessors, bool is_key_nullable, typename KeyT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorCountUnary(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - if (argument_accessor->getUntypedValue(argument_id) == nullptr) { - continue; - } - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( - argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); - if (is_argument_nullable && argument == nullptr) { - continue; - } - vec_table[loc].fetch_add(*argument, std::memory_order_relaxed); - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeAggregationStateHashTable - ::upsertValueAccessorGenericSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( - argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); - if (is_argument_nullable && argument == nullptr) { - continue; - } - const ArgumentT arg_val = *argument; - std::atomic<StateT> &state = vec_table[loc]; - StateT state_val = state.load(std::memory_order_relaxed); - while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {} - } -} - -} // namespace quickstep - -#endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp new file mode 100644 index 0000000..8065cd9 --- /dev/null +++ b/storage/CollisionFreeVectorTable.cpp @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "storage/CollisionFreeVectorTable.hpp" + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <map> +#include <memory> +#include <vector> + +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" + +namespace quickstep { + +CollisionFreeVectorTable::CollisionFreeVectorTable( + const std::vector<const Type *> &key_types, + const std::size_t num_entries, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager) + : key_type_(key_types.front()), + num_entries_(num_entries), + num_handles_(handles.size()), + handles_(handles), + num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)), + storage_manager_(storage_manager) { + CHECK_EQ(1u, key_types.size()); + DCHECK_GT(num_entries, 0u); + + std::map<std::string, std::size_t> memory_offsets; + std::size_t required_memory = 0; + + memory_offsets.emplace("existence_map", required_memory); + required_memory += CacheLineAlignedBytes( + BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + const AggregationHandle *handle = handles_[i]; + const std::vector<const Type *> argument_types = handle->getArgumentTypes(); + + std::size_t state_size = 0; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + state_size = sizeof(std::atomic<std::size_t>); + break; + } + case AggregationID::kSum: { + CHECK_EQ(1u, argument_types.size()); + switch (argument_types.front()->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + state_size = sizeof(std::atomic<std::int64_t>); + break; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + state_size = sizeof(std::atomic<double>); + break; + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + + memory_offsets.emplace(std::string("state") + std::to_string(i), + required_memory); + required_memory += CacheLineAlignedBytes(state_size * num_entries); + } + + const std::size_t num_storage_slots = + storage_manager_->SlotsNeededForBytes(required_memory); + + const block_id blob_id = storage_manager_->createBlob(num_storage_slots); + blob_ = storage_manager_->getBlobMutable(blob_id); + + void *memory_start = blob_->getMemoryMutable(); + existence_map_.reset(new BarrieredReadWriteConcurrentBitVector( + reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"), + num_entries, + false /* initialize */)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + vec_tables_.emplace_back( + reinterpret_cast<char *>(memory_start) + + memory_offsets.at(std::string("state") + std::to_string(i))); + } + + memory_size_ = required_memory; + num_init_partitions_ = + std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL)); +} + +CollisionFreeVectorTable::~CollisionFreeVectorTable() { + const block_id blob_id = blob_->getID(); + blob_.release(); + storage_manager_->deleteBlockOrBlobFile(blob_id); +} + +void CollisionFreeVectorTable::destroyPayload() { +} + +bool CollisionFreeVectorTable::upsertValueAccessor( + const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_ids, + ValueAccessorMultiplexer *accessor_mux) { + DCHECK_EQ(1u, key_ids.size()); + + const ValueAccessorSource key_source = key_ids.front().source; + const attribute_id key_id = key_ids.front().attr_id; + const bool is_key_nullable = key_type_->isNullable(); + + if (handles_.empty()) { + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + accessor_mux->getValueAccessorBySource(key_source), + [&](auto *accessor) -> void { // NOLINT(build/c++11) + upsertValueAccessorKeyOnlyHelper(is_key_nullable, + key_type_, + key_id, + accessor); + }); + return true; + } + + DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType() + == ValueAccessor::Implementation::kColumnVectors); + ValueAccessor *base_accessor = accessor_mux->getBaseAccessor(); + ColumnVectorsValueAccessor *derived_accesor = + static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor()); + + for (std::size_t i = 0; i < num_handles_; ++i) { + DCHECK_LE(argument_ids[i].size(), 1u); + + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + const auto &argument_ids_i = argument_ids[i]; + + ValueAccessorSource argument_source; + attribute_id argument_id; + const Type *argument_type; + bool is_argument_nullable; + + if (argument_ids_i.empty()) { + argument_source = ValueAccessorSource::kInvalid; + argument_id = kInvalidAttributeID; + + DCHECK(argument_types.empty()); + argument_type = nullptr; + is_argument_nullable = false; + } else { + DCHECK_EQ(1u, argument_ids_i.size()); + argument_source = argument_ids_i.front().source; + argument_id = argument_ids_i.front().attr_id; + + DCHECK_EQ(1u, argument_types.size()); + argument_type = argument_types.front(); + is_argument_nullable = argument_type->isNullable(); + } + + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + base_accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + if (key_source == ValueAccessorSource::kBase) { + if (argument_source == ValueAccessorSource::kBase) { + upsertValueAccessorDispatchHelper<false>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + accessor, + accessor); + } else { + upsertValueAccessorDispatchHelper<true>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + accessor, + derived_accesor); + } + } else { + if (argument_source == ValueAccessorSource::kBase) { + upsertValueAccessorDispatchHelper<true>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + derived_accesor, + accessor); + } else { + upsertValueAccessorDispatchHelper<false>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + derived_accesor, + derived_accesor); + } + } + }); + } + return true; +} + +void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + switch (key_type_->getTypeID()) { + case TypeID::kInt: + finalizeKeyInternal<int>(start_position, end_position, output_cv); + return; + case TypeID::kLong: + finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, + std::size_t handle_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + const AggregationHandle *handle = handles_[handle_id]; + const auto &argument_types = handle->getArgumentTypes(); + const Type *argument_type = + argument_types.empty() ? nullptr : argument_types.front(); + + finalizeStateDispatchHelper(handle->getAggregationID(), + argument_type, + vec_tables_[handle_id], + start_position, + end_position, + output_cv); +} + +} // namespace quickstep