Use partitioned aggregation for single-function DISTINCT aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/17477f57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/17477f57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/17477f57 Branch: refs/heads/reorder-partitioned-hash-join Commit: 17477f5756e599b4276d6d366c3144cad0be536f Parents: 4be8e91 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Mon Feb 20 20:05:08 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Fri Feb 24 11:10:56 2017 -0600 ---------------------------------------------------------------------- storage/AggregationOperationState.cpp | 158 ++++++++++++++++++++--------- storage/AggregationOperationState.hpp | 3 + storage/CMakeLists.txt | 3 +- storage/PackedPayloadHashTable.cpp | 33 +++--- storage/PackedPayloadHashTable.hpp | 32 ++++-- utility/TemplateUtil.hpp | 74 +++++++++++++- 6 files changed, 228 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 0f39b41..eef2c9d 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -19,8 +19,10 @@ #include "storage/AggregationOperationState.hpp" +#include <algorithm> #include <cstddef> #include <cstdint> +#include <functional> #include <memory> #include <string> #include <utility> @@ -87,6 +89,8 @@ AggregationOperationState::AggregationOperationState( is_aggregate_partitioned_(false), predicate_(predicate), is_distinct_(std::move(is_distinct)), + all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(), + !is_distinct_.empty(), std::logical_and<bool>())), storage_manager_(storage_manager) { if (!group_by.empty()) { if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) { @@ -163,11 +167,6 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); if (!group_by_key_ids_.empty()) { - // Aggregation with GROUP BY: combined payload is partially updated in - // the presence of DISTINCT. - if (*is_distinct_it) { - handles_.back()->blockUpdate(); - } group_by_handles.emplace_back(handles_.back().get()); } else { // Aggregation without GROUP BY: create a single global state. @@ -180,17 +179,32 @@ AggregationOperationState::AggregationOperationState( std::vector<const Type *> key_types(group_by_types_); key_types.insert( key_types.end(), argument_types.begin(), argument_types.end()); + // TODO(jianqiao): estimated_num_entries is quite inaccurate for // estimating the number of entries in the distinctify hash table. // We need to estimate for each distinct aggregation an // estimated_num_distinct_keys value during query optimization. - distinctify_hashtables_.emplace_back( - AggregationStateHashTableFactory::CreateResizable( - *distinctify_hash_table_impl_types_it, - key_types, - estimated_num_entries, - {} /* handles */, - storage_manager)); + if (is_aggregate_partitioned_) { + DCHECK(partitioned_group_by_hashtable_pool_ == nullptr); + partitioned_group_by_hashtable_pool_.reset( + new PartitionedHashTablePool(estimated_num_entries, + FLAGS_num_aggregation_partitions, + *distinctify_hash_table_impl_types_it, + key_types, + {}, + storage_manager)); + } else { + distinctify_hashtables_.emplace_back( + AggregationStateHashTableFactory::CreateResizable( + *distinctify_hash_table_impl_types_it, + key_types, + estimated_num_entries, + {} /* handles */, + storage_manager)); + + // Combined payload is partially updated in the presence of DISTINCT. + handles_.back()->blockUpdate(); + } ++distinctify_hash_table_impl_types_it; } else { distinctify_hashtables_.emplace_back(nullptr); @@ -208,13 +222,24 @@ AggregationOperationState::AggregationOperationState( group_by_handles, storage_manager)); } else if (is_aggregate_partitioned_) { - partitioned_group_by_hashtable_pool_.reset( - new PartitionedHashTablePool(estimated_num_entries, - FLAGS_num_aggregation_partitions, - hash_table_impl_type, - group_by_types_, - group_by_handles, - storage_manager)); + if (all_distinct_) { + DCHECK_EQ(1u, group_by_handles.size()); + DCHECK(partitioned_group_by_hashtable_pool_ != nullptr); + group_by_hashtable_pool_.reset( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); + } else { + partitioned_group_by_hashtable_pool_.reset( + new PartitionedHashTablePool(estimated_num_entries, + FLAGS_num_aggregation_partitions, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); + } } else { group_by_hashtable_pool_.reset( new HashTablePool(estimated_num_entries, @@ -362,11 +387,13 @@ bool AggregationOperationState::checkAggregatePartitioned( if (aggregate_functions.empty()) { return false; } - // Check if there's a distinct operation involved in any aggregate, if so - // the aggregate can't be partitioned. - for (auto distinct : is_distinct) { - if (distinct) { - return false; + // If there is only only aggregate function, we allow distinct aggregation. + // Otherwise it can't be partitioned with distinct aggregation. + if (aggregate_functions.size() > 1) { + for (auto distinct : is_distinct) { + if (distinct) { + return false; + } } } // There's no distinct aggregation involved, Check if there's at least one @@ -384,12 +411,17 @@ bool AggregationOperationState::checkAggregatePartitioned( } } + // Currently we always use partitioned aggregation to parallelize distinct + // aggregation. + if (all_distinct_) { + return true; + } + // There are GROUP BYs without DISTINCT. Check if the estimated number of // groups is large enough to warrant a partitioned aggregation. return estimated_num_groups > static_cast<std::size_t>( FLAGS_partition_aggregation_num_groups_threshold); - return false; } std::size_t AggregationOperationState::getNumInitializationPartitions() const { @@ -599,10 +631,19 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned( } ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get()); - partitioned_group_by_hashtable_pool_->getHashTable(partition) - ->upsertValueAccessorCompositeKey(argument_ids_, - group_by_key_ids_, - local_mux); + if (all_distinct_) { + DCHECK_EQ(1u, handles_.size()); + handles_.front()->insertValueAccessorIntoDistinctifyHashTable( + argument_ids_.front(), + group_by_key_ids_, + local_mux, + partitioned_group_by_hashtable_pool_->getHashTable(partition)); + } else { + partitioned_group_by_hashtable_pool_->getHashTable(partition) + ->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + local_mux); + } } }); } @@ -621,13 +662,15 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate( } } - AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pool_->getHashTable(); + if (!all_distinct_) { + AggregationStateHashTableBase *agg_hash_table = + group_by_hashtable_pool_->getHashTable(); - agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_, - group_by_key_ids_, - accessor_mux); - group_by_hashtable_pool_->returnHashTable(agg_hash_table); + agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); + } } void AggregationOperationState::finalizeAggregate( @@ -711,10 +754,24 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree( void AggregationOperationState::finalizeHashTableImplPartitioned( const std::size_t partition_id, InsertDestination *output_destination) { - PackedPayloadHashTable *hash_table = + PackedPayloadHashTable *partitioned_hash_table = static_cast<PackedPayloadHashTable *>( partitioned_group_by_hashtable_pool_->getHashTable(partition_id)); + PackedPayloadHashTable *hash_table; + if (all_distinct_) { + DCHECK_EQ(1u, handles_.size()); + DCHECK(group_by_hashtable_pool_ != nullptr); + + hash_table = static_cast<PackedPayloadHashTable *>( + group_by_hashtable_pool_->getHashTable()); + handles_.front()->aggregateOnDistinctifyHashTableForGroupBy( + *partitioned_hash_table, 0, hash_table); + partitioned_hash_table->destroyPayload(); + } else { + hash_table = partitioned_hash_table; + } + // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector<std::vector<TypedValue>> group_by_keys; @@ -790,19 +847,24 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate( // TODO(harshad) - Find heuristics for faster merge, even in a single thread. // e.g. Keep merging entries from smaller hash tables to larger. - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - return; - } + std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr; - std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr( - hash_tables->back().release()); - for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { - std::unique_ptr<AggregationStateHashTableBase> hash_table( - hash_tables->at(i).release()); - mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get()); - hash_table->destroyPayload(); + if (all_distinct_) { + final_hash_table_ptr.reset(group_by_hashtable_pool_->getHashTable()); + } else { + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); + DCHECK(hash_tables != nullptr); + if (hash_tables->empty()) { + return; + } + + final_hash_table_ptr.reset(hash_tables->back().release()); + for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { + std::unique_ptr<AggregationStateHashTableBase> hash_table( + hash_tables->at(i).release()); + mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get()); + hash_table->destroyPayload(); + } } PackedPayloadHashTable *final_hash_table = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index c8930ee..6c9690a 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -273,6 +273,9 @@ class AggregationOperationState { // arguments. std::vector<bool> is_distinct_; + // A flag indicating whether all aggregate functions are DISTINCT aggregations. + const bool all_distinct_; + // Non-trivial group-by/argument expressions that need to be evaluated. std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 293be17..8b68150 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -817,7 +817,8 @@ target_link_libraries(quickstep_storage_PackedPayloadHashTable quickstep_utility_Alignment quickstep_utility_HashPair quickstep_utility_Macros - quickstep_utility_PrimeNumber) + quickstep_utility_PrimeNumber + quickstep_utility_TemplateUtil) target_link_libraries(quickstep_storage_PartitionedHashTablePool glog quickstep_storage_HashTableBase http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp index bf5eaee..3d672f2 100644 --- a/storage/PackedPayloadHashTable.cpp +++ b/storage/PackedPayloadHashTable.cpp @@ -40,6 +40,7 @@ #include "utility/Alignment.hpp" #include "utility/Macros.hpp" #include "utility/PrimeNumber.hpp" +#include "utility/TemplateUtil.hpp" #include "glog/logging.h" @@ -234,23 +235,31 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor(); + const bool has_derived_accessor = (derived_accessor != nullptr); + base_accessor->beginIterationVirtual(); - if (derived_accessor == nullptr) { - return upsertValueAccessorCompositeKeyInternal<false>( - argument_ids, - key_attr_ids, - base_accessor, - nullptr); - } else { + if (has_derived_accessor) { DCHECK(derived_accessor->getImplementationType() == ValueAccessor::Implementation::kColumnVectors); derived_accessor->beginIterationVirtual(); - return upsertValueAccessorCompositeKeyInternal<true>( - argument_ids, - key_attr_ids, - base_accessor, - static_cast<ColumnVectorsValueAccessor *>(derived_accessor)); } + + return InvokeOnBools( + has_derived_accessor, + handles_.empty(), + !all_keys_inline_, + [&](auto use_two_accessors, // NOLINT(build/c++11) + auto key_only, + auto has_variable_size) -> bool { + return upsertValueAccessorCompositeKeyInternal< + decltype(use_two_accessors)::value, + decltype(key_only)::value, + decltype(has_variable_size)::value>( + argument_ids, + key_attr_ids, + base_accessor, + static_cast<ColumnVectorsValueAccessor *>(derived_accessor)); + }); } void PackedPayloadHashTable::resize(const std::size_t extra_buckets, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/storage/PackedPayloadHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp index f87a1de..c49bdb4 100644 --- a/storage/PackedPayloadHashTable.hpp +++ b/storage/PackedPayloadHashTable.hpp @@ -20,10 +20,12 @@ #ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ #define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ +#include <algorithm> #include <atomic> #include <cstddef> #include <cstdint> #include <cstring> +#include <functional> #include <limits> #include <vector> @@ -336,11 +338,12 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { const std::uint8_t **value, std::size_t *entry_num) const; + template <bool key_only = false> inline std::uint8_t* upsertCompositeKeyInternal( const std::vector<TypedValue> &key, const std::size_t variable_key_size); - template <bool use_two_accessors> + template <bool use_two_accessors, bool key_only, bool has_variable_size> inline bool upsertValueAccessorCompositeKeyInternal( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, @@ -355,8 +358,9 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { // comes from a HashTableKeyManager, and is set by the constructor of a // subclass of HashTable. inline void setKeyInline(const std::vector<bool> *key_inline) { - scalar_key_inline_ = key_inline->front(); key_inline_ = key_inline; + all_keys_inline_ = std::accumulate(key_inline_->begin(), key_inline_->end(), + true, std::logical_and<bool>()); } inline static std::size_t ComputeTotalPayloadSize( @@ -407,7 +411,7 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { // Information about whether key components are stored inline or in a // separate variable-length storage region. This is usually determined by a // HashTableKeyManager and set by calling setKeyInline(). - bool scalar_key_inline_; + bool all_keys_inline_; const std::vector<bool> *key_inline_; const std::size_t num_handles_; @@ -763,7 +767,7 @@ inline bool PackedPayloadHashTable::upsertCompositeKey( } } - +template <bool key_only> inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( const std::vector<TypedValue> &key, const std::size_t variable_key_size) { @@ -809,7 +813,9 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( writeCompositeKeyToBucket(key, hash_code, bucket); std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset; - std::memcpy(value, init_payload_, this->total_payload_size_); + if (!key_only) { + std::memcpy(value, init_payload_, this->total_payload_size_); + } // Update the previous chaing pointer to point to the new bucket. pending_chain_ptr->store(pending_chain_ptr_finish_value, @@ -819,13 +825,13 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( return value; } -template <bool use_two_accessors> +template <bool use_two_accessors, bool key_only, bool has_variable_size> inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, ValueAccessor *base_accessor, ColumnVectorsValueAccessor *derived_accessor) { - std::size_t variable_size; + std::size_t variable_size = 0; std::vector<TypedValue> key_vector; key_vector.resize(key_ids.size()); @@ -848,13 +854,17 @@ inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal( &key_vector)) { continue; } - variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector); - std::uint8_t *value = this->upsertCompositeKeyInternal( - key_vector, variable_size); + if (has_variable_size) { + variable_size = + this->calculateVariableLengthCompositeKeyCopySize(key_vector); + } + std::uint8_t *value = + this->template upsertCompositeKeyInternal<key_only>( + key_vector, variable_size); if (value == nullptr) { continuing = true; break; - } else { + } else if (!key_only) { SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value))); for (unsigned int k = 0; k < num_handles_; ++k) { const auto &ids = argument_ids[k]; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/17477f57/utility/TemplateUtil.hpp ---------------------------------------------------------------------- diff --git a/utility/TemplateUtil.hpp b/utility/TemplateUtil.hpp index 33e4f42..dfae8e4 100644 --- a/utility/TemplateUtil.hpp +++ b/utility/TemplateUtil.hpp @@ -30,6 +30,8 @@ namespace quickstep { * @{ */ +namespace template_util_inner { + /** * @brief Represents a compile-time sequence of integers. * @@ -58,7 +60,6 @@ struct MakeSequence<0, S...> { typedef Sequence<S...> type; }; - /** * @brief Final step of CreateBoolInstantiatedInstance. Now all bool_values are * ready. Instantiate the template and create (i.e. new) an instance. @@ -72,6 +73,42 @@ inline ReturnT* CreateBoolInstantiatedInstanceInner(Tuple &&args, } /** + * @brief Invoke the functor with the compile-time bool values wrapped as + * integral_constant types. + */ +template <typename FunctorT, bool ...bool_values> +inline auto InvokeOnBoolsInner(const FunctorT &functor) { + return functor(std::integral_constant<bool, bool_values>()...); +} + +/** + * @brief Recursive dispatching. + */ +template <typename FunctorT, bool ...bool_values, typename ...Bools> +inline auto InvokeOnBoolsInner(const FunctorT &functor, + const bool tparam, + const Bools ...rest_params) { + if (tparam) { + return InvokeOnBoolsInner<FunctorT, bool_values..., true>( + functor, rest_params...); + } else { + return InvokeOnBoolsInner<FunctorT, bool_values..., false>( + functor, rest_params...); + } +} + +/** + * @brief Move the functor to the first position in argument list. + */ +template <std::size_t last, std::size_t ...i, typename TupleT> +inline auto InvokeOnBoolsInner(TupleT &&args, Sequence<i...> &&indices) { + return InvokeOnBoolsInner(std::get<last>(std::forward<TupleT>(args)), + std::get<i>(std::forward<TupleT>(args))...); +} + +} // namespace template_util_inner + +/** * @brief Edge case of the recursive CreateBoolInstantiatedInstance function * when all bool variables have been branched and replaced with compile-time * bool constants. @@ -85,8 +122,10 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args) { // for the tuple, so that the tuple can be unpacked as a sequence of constructor // parameters in CreateBoolInstantiatedInstanceInner. constexpr std::size_t n_args = std::tuple_size<Tuple>::value; - return CreateBoolInstantiatedInstanceInner<T, ReturnT, bool_values...>( - std::forward<Tuple>(args), typename MakeSequence<n_args>::type()); + return template_util_inner::CreateBoolInstantiatedInstanceInner< + T, ReturnT, bool_values...>( + std::forward<Tuple>(args), + typename template_util_inner::MakeSequence<n_args>::type()); } /** @@ -160,6 +199,35 @@ inline ReturnT* CreateBoolInstantiatedInstance(Tuple &&args, } } +/** + * @brief A helper function for bool branched template specialization. + * + * Usage example: + * -- + * bool c1 = true, c2 = false; + * + * InvokeOnBools( + * c1, c2, + * [&](auto c1, auto c2) -> SomeBaseClass* { + * using T1 = decltype(c1); // T1 == std::true_type + * using T2 = decltype(c2); // T2 == std::false_type + * + * constexpr bool cv1 = T1::value; // cv1 == true + * constexpr bool cv2 = T2::value; // cv2 == false + * + * SomeFunction<cv1, cv2>(...); + * return new SomeClass<cv1, cv2>(...); + * }); + * -- + */ +template <typename ...ArgTypes> +inline auto InvokeOnBools(ArgTypes ...args) { + constexpr std::size_t last = sizeof...(args) - 1; + return template_util_inner::InvokeOnBoolsInner<last>( + std::forward_as_tuple(args...), + typename template_util_inner::MakeSequence<last>::type()); +} + /** @} */ } // namespace quickstep