Updates

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1e7a92a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1e7a92a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1e7a92a9

Branch: refs/heads/collision-free-agg
Commit: 1e7a92a94e0076d89151ea4a2ab4f68caa0572c0
Parents: 3bcb5c8
Author: Jianqiao Zhu <jianq...@cs.wisc.edu>
Authored: Fri Feb 3 22:44:37 2017 -0600
Committer: Jianqiao Zhu <jianq...@cs.wisc.edu>
Committed: Fri Feb 3 22:44:37 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |   4 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  21 +-
 expressions/aggregation/CMakeLists.txt          |   2 +-
 storage/AggregationOperationState.cpp           |  31 +-
 storage/CMakeLists.txt                          |  24 +-
 .../CollisionFreeAggregationStateHashTable.cpp  | 285 -------
 .../CollisionFreeAggregationStateHashTable.hpp  | 621 --------------
 storage/CollisionFreeVectorTable.cpp            | 283 +++++++
 storage/CollisionFreeVectorTable.hpp            | 621 ++++++++++++++
 storage/HashTableFactory.hpp                    |   8 +-
 .../PackedPayloadAggregationStateHashTable.cpp  | 439 ----------
 .../PackedPayloadAggregationStateHashTable.hpp  | 805 -------------------
 storage/PackedPayloadHashTable.cpp              | 436 ++++++++++
 storage/PackedPayloadHashTable.hpp              | 798 ++++++++++++++++++
 14 files changed, 2180 insertions(+), 2198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp 
b/expressions/aggregation/AggregationConcreteHandle.cpp
index 5fd7e0f..bbce29f 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -24,7 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/HashTableFactory.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 
 namespace quickstep {
@@ -57,7 +57,7 @@ void 
AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
     concatenated_ids.emplace_back(arg_id);
   }
 
-  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable 
*>(distinctify_hash_table)
+  static_cast<PackedPayloadHashTable *>(distinctify_hash_table)
       ->upsertValueAccessor({}, concatenated_ids, accessor_mux);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp 
b/expressions/aggregation/AggregationConcreteHandle.hpp
index 5b49d0d..c8d61ff 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -29,7 +29,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
@@ -151,7 +151,7 @@ class AggregationConcreteHandle : public AggregationHandle {
       const std::size_t index,
       const std::vector<TypedValue> &group_key) const {
     const std::uint8_t *group_state =
-        static_cast<const 
PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table)
+        static_cast<const PackedPayloadHashTable &>(hash_table)
             .getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
@@ -217,8 +217,7 @@ StateT* AggregationConcreteHandle::
   };
 
   const auto &hash_table =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable 
&>(
-          distinctify_hash_table);
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each key from the
   // distinctify hash table.
   hash_table.forEach(&aggregate_functor);
@@ -233,9 +232,8 @@ void AggregationConcreteHandle::
         const std::size_t index,
         AggregationStateHashTableBase *aggregation_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  PackedPayloadSeparateChainingAggregationStateHashTable *target_hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
-          aggregation_hash_table);
+  PackedPayloadHashTable *target_hash_table =
+      static_cast<PackedPayloadHashTable *>(aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
@@ -256,9 +254,8 @@ void AggregationConcreteHandle::
     target_hash_table->upsertCompositeKey(key, &upserter, index);
   };
 
-  const PackedPayloadSeparateChainingAggregationStateHashTable 
&source_hash_table =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable 
&>(
-          distinctify_hash_table);
+  const PackedPayloadHashTable &source_hash_table =
+      static_cast<const PackedPayloadHashTable &>(distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each composite key 
vector
   // from the distinctify hash table.
   source_hash_table.forEachCompositeKey(&aggregate_functor);
@@ -271,8 +268,8 @@ ColumnVector* 
AggregationConcreteHandle::finalizeHashTableHelper(
     const std::size_t index,
     std::vector<std::vector<TypedValue>> *group_by_keys) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  const PackedPayloadSeparateChainingAggregationStateHashTable 
&hash_table_concrete =
-      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable 
&>(hash_table);
+  const PackedPayloadHashTable &hash_table_concrete =
+      static_cast<const PackedPayloadHashTable &>(hash_table);
 
   if (group_by_keys->empty()) {
     if (NativeColumnVector::UsableForType(result_type)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt 
b/expressions/aggregation/CMakeLists.txt
index 0816db3..7203c8c 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,7 +146,7 @@ 
target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_TypedValue

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp 
b/storage/AggregationOperationState.cpp
index a393185..4ffd418 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,10 +38,11 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
+#include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTableFactory.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/InsertDestination.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
@@ -353,7 +354,7 @@ bool AggregationOperationState::ProtoIsValid(
 
 std::size_t AggregationOperationState::getNumPartitions() const {
   if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeAggregationStateHashTable *>(
+    return static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->getNumFinalizationPartitions();
   } else if (is_aggregate_partitioned_) {
     return partitioned_group_by_hashtable_pool_->getNumPartitions();
@@ -364,7 +365,7 @@ std::size_t AggregationOperationState::getNumPartitions() 
const {
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
   if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeAggregationStateHashTable *>(
+    return static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->getNumInitializationPartitions();
   } else {
     return 0u;
@@ -373,7 +374,7 @@ std::size_t 
AggregationOperationState::getNumInitializationPartitions() const {
 
 void AggregationOperationState::initializeState(const std::size_t 
partition_id) {
   if (is_aggregate_collision_free_) {
-    static_cast<CollisionFreeAggregationStateHashTable *>(
+    static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
     LOG(FATAL) << "AggregationOperationState::initializeState() "
@@ -512,10 +513,10 @@ void AggregationOperationState::mergeSingleState(
 }
 
 void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) 
const {
-  HashTableMerger merger(dst);
-  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
-      ->forEachCompositeKey(&merger);
+    AggregationStateHashTableBase *src,
+    AggregationStateHashTableBase *dst) const {
+  HashTableMerger merger(static_cast<PackedPayloadHashTable *>(dst));
+  static_cast<PackedPayloadHashTable *>(src)->forEachCompositeKey(&merger);
 }
 
 void AggregationOperationState::aggregateBlockHashTable(
@@ -661,9 +662,8 @@ void 
AggregationOperationState::finalizeHashTableImplCollisionFree(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  CollisionFreeAggregationStateHashTable *hash_table =
-      static_cast<CollisionFreeAggregationStateHashTable *>(
-          collision_free_hashtable_.get());
+  CollisionFreeVectorTable *hash_table =
+      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
 
   // TODO
   const std::size_t max_length =
@@ -696,8 +696,8 @@ void 
AggregationOperationState::finalizeHashTableImplCollisionFree(
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  PackedPayloadSeparateChainingAggregationStateHashTable *hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+  PackedPayloadHashTable *hash_table =
+      static_cast<PackedPayloadHashTable *>(
           partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
 
   // Each element of 'group_by_keys' is a vector of values for a particular
@@ -790,9 +790,8 @@ void 
AggregationOperationState::finalizeHashTableImplThreadPrivate(
     hash_table->destroyPayload();
   }
 
-  PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table =
-      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
-          final_hash_table_ptr.get());
+  PackedPayloadHashTable *final_hash_table =
+      static_cast<PackedPayloadHashTable *>(final_hash_table_ptr.get());
 
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index d43d7a2..8fbb4ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -165,9 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
-add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
-            CollisionFreeAggregationStateHashTable.cpp
-            CollisionFreeAggregationStateHashTable.hpp)
+add_library(quickstep_storage_CollisionFreeVectorTable
+            CollisionFreeVectorTable.cpp
+            CollisionFreeVectorTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp 
ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder 
CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -225,9 +225,7 @@ add_library(quickstep_storage_InsertDestination_proto
 add_library(quickstep_storage_LinearOpenAddressingHashTable
             ../empty_src.cpp
             LinearOpenAddressingHashTable.hpp)
-add_library(quickstep_storage_PackedPayloadAggregationStateHashTable
-            PackedPayloadAggregationStateHashTable.cpp
-            PackedPayloadAggregationStateHashTable.hpp)
+add_library(quickstep_storage_PackedPayloadHashTable 
PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp)
 add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp 
PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp 
PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp 
SMAIndexSubBlock.hpp)
@@ -284,7 +282,7 @@ 
target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -435,7 +433,7 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
-target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+target_link_libraries(quickstep_storage_CollisionFreeVectorTable
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
@@ -714,12 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
-                      quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
                       quickstep_storage_LinearOpenAddressingHashTable
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
                       quickstep_storage_TupleReference
@@ -798,7 +796,7 @@ 
target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_utility_Alignment
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
-target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
+target_link_libraries(quickstep_storage_PackedPayloadHashTable
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_storage_HashTableBase
@@ -1115,7 +1113,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
-                      quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       
quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1141,7 +1139,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PartitionedHashTablePool
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp 
b/storage/CollisionFreeAggregationStateHashTable.cpp
deleted file mode 100644
index 2f3b336..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "storage/CollisionFreeAggregationStateHashTable.hpp"
-
-#include <algorithm>
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstdlib>
-#include <map>
-#include <memory>
-#include <vector>
-
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "storage/ValueAccessorUtil.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-
-namespace quickstep {
-
-CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
-    const std::vector<const Type *> &key_types,
-    const std::size_t num_entries,
-    const std::vector<AggregationHandle *> &handles,
-    StorageManager *storage_manager)
-    : key_type_(key_types.front()),
-      num_entries_(num_entries),
-      num_handles_(handles.size()),
-      handles_(handles),
-      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
-      storage_manager_(storage_manager) {
-  CHECK_EQ(1u, key_types.size());
-  DCHECK_GT(num_entries, 0u);
-
-  std::map<std::string, std::size_t> memory_offsets;
-  std::size_t required_memory = 0;
-
-  memory_offsets.emplace("existence_map", required_memory);
-  required_memory += CacheLineAlignedBytes(
-      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    const AggregationHandle *handle = handles_[i];
-    const std::vector<const Type *> argument_types = 
handle->getArgumentTypes();
-
-    std::size_t state_size = 0;
-    switch (handle->getAggregationID()) {
-      case AggregationID::kCount: {
-        state_size = sizeof(std::atomic<std::size_t>);
-        break;
-      }
-      case AggregationID::kSum: {
-        CHECK_EQ(1u, argument_types.size());
-        switch (argument_types.front()->getTypeID()) {
-          case TypeID::kInt:  // Fall through
-          case TypeID::kLong:
-            state_size = sizeof(std::atomic<std::int64_t>);
-            break;
-          case TypeID::kFloat:  // Fall through
-          case TypeID::kDouble:
-            state_size = sizeof(std::atomic<double>);
-            break;
-          default:
-            LOG(FATAL) << "Not implemented";
-        }
-        break;
-      }
-      default:
-        LOG(FATAL) << "Not implemented";
-    }
-
-    memory_offsets.emplace(std::string("state") + std::to_string(i),
-                           required_memory);
-    required_memory += CacheLineAlignedBytes(state_size * num_entries);
-  }
-
-  const std::size_t num_storage_slots =
-      storage_manager_->SlotsNeededForBytes(required_memory);
-
-  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
-  blob_ = storage_manager_->getBlobMutable(blob_id);
-
-  void *memory_start = blob_->getMemoryMutable();
-  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start) + 
memory_offsets.at("existence_map"),
-      num_entries,
-      false /* initialize */));
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    vec_tables_.emplace_back(
-        reinterpret_cast<char *>(memory_start) +
-            memory_offsets.at(std::string("state") + std::to_string(i)));
-  }
-
-  memory_size_ = required_memory;
-  num_init_partitions_ =
-      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
-}
-
-CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable()
 {
-  const block_id blob_id = blob_->getID();
-  blob_.release();
-  storage_manager_->deleteBlockOrBlobFile(blob_id);
-}
-
-void CollisionFreeAggregationStateHashTable::destroyPayload() {
-}
-
-bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
-    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-    const std::vector<MultiSourceAttributeId> &key_ids,
-    ValueAccessorMultiplexer *accessor_mux) {
-  DCHECK_EQ(1u, key_ids.size());
-
-  const ValueAccessorSource key_source = key_ids.front().source;
-  const attribute_id key_id = key_ids.front().attr_id;
-  const bool is_key_nullable = key_type_->isNullable();
-
-  if (handles_.empty()) {
-    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-        accessor_mux->getValueAccessorBySource(key_source),
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      upsertValueAccessorKeyOnlyHelper(is_key_nullable,
-                                       key_type_,
-                                       key_id,
-                                       accessor);
-    });
-    return true;
-  }
-
-  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
-             == ValueAccessor::Implementation::kColumnVectors);
-  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
-  ColumnVectorsValueAccessor *derived_accesor =
-      static_cast<ColumnVectorsValueAccessor 
*>(accessor_mux->getDerivedAccessor());
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    DCHECK_LE(argument_ids[i].size(), 1u);
-
-    const AggregationHandle *handle = handles_[i];
-    const auto &argument_types = handle->getArgumentTypes();
-    const auto &argument_ids_i = argument_ids[i];
-
-    ValueAccessorSource argument_source;
-    attribute_id argument_id;
-    const Type *argument_type;
-    bool is_argument_nullable;
-
-    if (argument_ids_i.empty()) {
-      argument_source = ValueAccessorSource::kInvalid;
-      argument_id = kInvalidAttributeID;
-
-      DCHECK(argument_types.empty());
-      argument_type = nullptr;
-      is_argument_nullable = false;
-    } else {
-      DCHECK_EQ(1u, argument_ids_i.size());
-      argument_source = argument_ids_i.front().source;
-      argument_id = argument_ids_i.front().attr_id;
-
-      DCHECK_EQ(1u, argument_types.size());
-      argument_type = argument_types.front();
-      is_argument_nullable = argument_type->isNullable();
-    }
-
-    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-        base_accessor,
-        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      if (key_source == ValueAccessorSource::kBase) {
-        if (argument_source == ValueAccessorSource::kBase) {
-          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                   is_argument_nullable,
-                                                   key_type_,
-                                                   argument_type,
-                                                   handle->getAggregationID(),
-                                                   key_id,
-                                                   argument_id,
-                                                   vec_tables_[i],
-                                                   accessor,
-                                                   accessor);
-        } else {
-          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                  is_argument_nullable,
-                                                  key_type_,
-                                                  argument_type,
-                                                  handle->getAggregationID(),
-                                                  key_id,
-                                                  argument_id,
-                                                  vec_tables_[i],
-                                                  accessor,
-                                                  derived_accesor);
-        }
-      } else {
-        if (argument_source == ValueAccessorSource::kBase) {
-          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
-                                                  is_argument_nullable,
-                                                  key_type_,
-                                                  argument_type,
-                                                  handle->getAggregationID(),
-                                                  key_id,
-                                                  argument_id,
-                                                  vec_tables_[i],
-                                                  derived_accesor,
-                                                  accessor);
-        } else {
-          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
-                                                   is_argument_nullable,
-                                                   key_type_,
-                                                   argument_type,
-                                                   handle->getAggregationID(),
-                                                   key_id,
-                                                   argument_id,
-                                                   vec_tables_[i],
-                                                   derived_accesor,
-                                                   derived_accesor);
-        }
-      }
-    });
-  }
-  return true;
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeKey(
-    const std::size_t partition_id,
-    NativeColumnVector *output_cv) const {
-  const std::size_t start_position =
-      calculatePartitionStartPosition(partition_id);
-  const std::size_t end_position =
-      calculatePartitionEndPosition(partition_id);
-
-  switch (key_type_->getTypeID()) {
-    case TypeID::kInt:
-      finalizeKeyInternal<int>(start_position, end_position, output_cv);
-      return;
-    case TypeID::kLong:
-      finalizeKeyInternal<std::int64_t>(start_position, end_position, 
output_cv);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-void CollisionFreeAggregationStateHashTable::finalizeState(
-    const std::size_t partition_id,
-    std::size_t handle_id,
-    NativeColumnVector *output_cv) const {
-  const std::size_t start_position =
-      calculatePartitionStartPosition(partition_id);
-  const std::size_t end_position =
-      calculatePartitionEndPosition(partition_id);
-
-  const AggregationHandle *handle = handles_[handle_id];
-  const auto &argument_types = handle->getArgumentTypes();
-  const Type *argument_type =
-      argument_types.empty() ? nullptr : argument_types.front();
-
-  finalizeStateDispatchHelper(handle->getAggregationID(),
-                              argument_type,
-                              vec_tables_[handle_id],
-                              start_position,
-                              end_position,
-                              output_cv);
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp 
b/storage/CollisionFreeAggregationStateHashTable.hpp
deleted file mode 100644
index d738e4e..0000000
--- a/storage/CollisionFreeAggregationStateHashTable.hpp
+++ /dev/null
@@ -1,621 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
-
-#include <atomic>
-#include <cstddef>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <type_traits>
-#include <utility>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageConstants.hpp"
-#include "storage/ValueAccessor.hpp"
-#include "storage/ValueAccessorMultiplexer.hpp"
-#include "types/Type.hpp"
-#include "types/TypeID.hpp"
-#include "types/TypedValue.hpp"
-#include "types/containers/ColumnVector.hpp"
-#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVectorsValueAccessor;
-class StorageMnager;
-
-/** \addtogroup Storage
- *  @{
- */
-
-class CollisionFreeAggregationStateHashTable : public 
AggregationStateHashTableBase {
- public:
-  CollisionFreeAggregationStateHashTable(
-      const std::vector<const Type *> &key_types,
-      const std::size_t num_entries,
-      const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager);
-
-  ~CollisionFreeAggregationStateHashTable() override;
-
-  void destroyPayload() override;
-
-  inline std::size_t getNumInitializationPartitions() const {
-    return num_init_partitions_;
-  }
-
-  inline std::size_t getNumFinalizationPartitions() const {
-    return num_finalize_partitions_;
-  }
-
-  inline std::size_t getNumTuplesInPartition(
-      const std::size_t partition_id) const {
-    const std::size_t start_position =
-        calculatePartitionStartPosition(partition_id);
-    const std::size_t end_position =
-        calculatePartitionEndPosition(partition_id);
-    return existence_map_->onesCountInRange(start_position, end_position);
-  }
-
-  inline void initialize(const std::size_t partition_id) {
-    const std::size_t memory_segment_size =
-        (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
-    const std::size_t memory_start = memory_segment_size * partition_id;
-    std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + 
memory_start,
-                0,
-                std::min(memory_segment_size, memory_size_ - memory_start));
-  }
-
-  bool upsertValueAccessor(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      ValueAccessorMultiplexer *accessor_mux) override;
-
-  void finalizeKey(const std::size_t partition_id,
-                   NativeColumnVector *output_cv) const;
-
-  void finalizeState(const std::size_t partition_id,
-                     std::size_t handle_id,
-                     NativeColumnVector *output_cv) const;
-
- private:
-  inline static std::size_t CacheLineAlignedBytes(const std::size_t 
actual_bytes) {
-    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * 
kCacheLineBytes;
-  }
-
-  inline std::size_t calculatePartitionLength() const {
-    const std::size_t partition_length =
-        (num_entries_ + num_finalize_partitions_ - 1) / 
num_finalize_partitions_;
-    DCHECK_GE(partition_length, 0u);
-    return partition_length;
-  }
-
-  inline std::size_t calculatePartitionStartPosition(
-      const std::size_t partition_id) const {
-    return calculatePartitionLength() * partition_id;
-  }
-
-  inline std::size_t calculatePartitionEndPosition(
-      const std::size_t partition_id) const {
-    return std::min(calculatePartitionLength() * (partition_id + 1),
-                    num_entries_);
-  }
-
-  template <bool use_two_accessors, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const bool is_key_nullable,
-      const bool is_argument_nullable,
-      ArgTypes &&...args);
-
-  template <bool ...bool_values, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-            typename KeyT, typename ...ArgTypes>
-  inline void upsertValueAccessorDispatchHelper(
-      const Type *argument_type,
-      const AggregationID agg_id,
-      ArgTypes &&...args);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename 
ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountHelper(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-            typename KeyT, typename KeyValueAccessorT, typename 
ArgumentValueAccessorT>
-  inline void upsertValueAccessorSumHelper(
-      const Type *argument_type,
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      void *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename ...ArgTypes>
-  inline void upsertValueAccessorKeyOnlyHelper(
-      const bool is_key_nullable,
-      const Type *key_type,
-      ArgTypes &&...args);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorKeyOnly(
-      const attribute_id key_attr_id,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT>
-  inline void upsertValueAccessorCountNullary(
-      const attribute_id key_attr_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorCountUnary(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<std::size_t> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorIntegerSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-            typename KeyT, typename ArgumentT, typename StateT,
-            typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-  inline void upsertValueAccessorGenericSum(
-      const attribute_id key_attr_id,
-      const attribute_id argument_id,
-      std::atomic<StateT> *vec_table,
-      KeyValueAccessorT *key_accessor,
-      ArgumentValueAccessorT *argument_accessor);
-
-  template <typename KeyT>
-  inline void finalizeKeyInternal(const std::size_t start_position,
-                                  const std::size_t end_position,
-                                  NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
-    }
-  }
-
-  template <typename ...ArgTypes>
-  inline void finalizeStateDispatchHelper(const AggregationID agg_id,
-                                          const Type *argument_type,
-                                          const void *vec_table,
-                                          ArgTypes &&...args) const {
-    switch (agg_id) {
-       case AggregationID::kCount:
-         finalizeStateCount(static_cast<const std::atomic<std::size_t> 
*>(vec_table),
-                            std::forward<ArgTypes>(args)...);
-         return;
-       case AggregationID::kSum:
-         finalizeStateSumHelper(argument_type,
-                                vec_table,
-                                std::forward<ArgTypes>(args)...);
-         return;
-       default:
-         LOG(FATAL) << "Not supported";
-    }
-  }
-
-  template <typename ...ArgTypes>
-  inline void finalizeStateSumHelper(const Type *argument_type,
-                                     const void *vec_table,
-                                     ArgTypes &&...args) const {
-    DCHECK(argument_type != nullptr);
-
-    switch (argument_type->getTypeID()) {
-      case TypeID::kInt:    // Fall through
-      case TypeID::kLong:
-        finalizeStateSum<std::int64_t>(
-            static_cast<const std::atomic<std::int64_t> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      case TypeID::kFloat:  // Fall through
-      case TypeID::kDouble:
-        finalizeStateSum<double>(
-            static_cast<const std::atomic<double> *>(vec_table),
-            std::forward<ArgTypes>(args)...);
-        return;
-      default:
-        LOG(FATAL) << "Not supported";
-    }
-  }
-
-  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
-                                 const std::size_t start_position,
-                                 const std::size_t end_position,
-                                 NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
-
-  template <typename ResultT, typename StateT>
-  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
-                               const std::size_t start_position,
-                               const std::size_t end_position,
-                               NativeColumnVector *output_cv) const {
-    std::size_t loc = start_position - 1;
-    while ((loc = existence_map_->nextOne(loc)) < end_position) {
-      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
-          vec_table[loc].load(std::memory_order_relaxed);
-    }
-  }
-
-  const Type *key_type_;
-  const std::size_t num_entries_;
-
-  const std::size_t num_handles_;
-  const std::vector<AggregationHandle *> handles_;
-
-  std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
-  std::vector<void *> vec_tables_;
-
-  const std::size_t num_finalize_partitions_;
-
-  StorageManager *storage_manager_;
-  MutableBlobReference blob_;
-
-  std::size_t memory_size_;
-  std::size_t num_init_partitions_;
-
-  DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable);
-};
-
-// ----------------------------------------------------------------------------
-// Implementations of template methods follow.
-
-template <bool use_two_accessors, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const bool is_key_nullable,
-                                        const bool is_argument_nullable,
-                                        ArgTypes &&...args) {
-  if (is_key_nullable) {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, true, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  } else {
-    if (is_argument_nullable) {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, true>(
-          std::forward<ArgTypes>(args)...);
-    } else {
-      upsertValueAccessorDispatchHelper<use_two_accessors, false, false>(
-          std::forward<ArgTypes>(args)...);
-    }
-  }
-}
-
-template <bool ...bool_values, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const Type *key_type,
-                                        ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorDispatchHelper<bool_values..., int>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>(
-          std::forward<ArgTypes>(args)...);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-          typename KeyT, typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorDispatchHelper(const Type *argument_type,
-                                        const AggregationID agg_id,
-                                        ArgTypes &&...args) {
-  switch (agg_id) {
-     case AggregationID::kCount:
-       upsertValueAccessorCountHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               std::forward<ArgTypes>(args)...);
-       return;
-     case AggregationID::kSum:
-       upsertValueAccessorSumHelper<
-           use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>(
-               argument_type, std::forward<ArgTypes>(args)...);
-       return;
-     default:
-       LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename ...ArgTypes>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable,
-                                       const Type *key_type,
-                                       ArgTypes &&...args) {
-  switch (key_type->getTypeID()) {
-    case TypeID::kInt: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, 
int>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    case TypeID::kLong: {
-      if (is_key_nullable) {
-        upsertValueAccessorKeyOnly<true, 
std::int64_t>(std::forward<ArgTypes>(args)...);
-      } else {
-        upsertValueAccessorKeyOnly<false, 
std::int64_t>(std::forward<ArgTypes>(args)...);
-      }
-      return;
-    }
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id,
-                                 ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    existence_map_->setBit(*key);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename 
ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountHelper(const attribute_id key_attr_id,
-                                     const attribute_id argument_id,
-                                     void *vec_table,
-                                     KeyValueAccessorT *key_accessor,
-                                     ArgumentValueAccessorT 
*argument_accessor) {
-  DCHECK_GE(key_attr_id, 0u);
-
-  if (is_argument_nullable && argument_id != kInvalidAttributeID) {
-    upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>(
-        key_attr_id,
-        argument_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor,
-        argument_accessor);
-    return;
-  } else {
-    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
-        key_attr_id,
-        static_cast<std::atomic<std::size_t> *>(vec_table),
-        key_accessor);
-    return;
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-          typename KeyT, typename KeyValueAccessorT, typename 
ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorSumHelper(const Type *argument_type,
-                                   const attribute_id key_attr_id,
-                                   const attribute_id argument_id,
-                                   void *vec_table,
-                                   KeyValueAccessorT *key_accessor,
-                                   ArgumentValueAccessorT *argument_accessor) {
-  DCHECK_GE(key_attr_id, 0u);
-  DCHECK_GE(argument_id, 0u);
-  DCHECK(argument_type != nullptr);
-
-  switch (argument_type->getTypeID()) {
-    case TypeID::kInt:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kLong:
-      upsertValueAccessorIntegerSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, 
std::int64_t>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<std::int64_t> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kFloat:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, 
float>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    case TypeID::kDouble:
-      upsertValueAccessorGenericSum<
-          use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, 
double>(
-              key_attr_id,
-              argument_id,
-              static_cast<std::atomic<double> *>(vec_table),
-              key_accessor,
-              argument_accessor);
-      return;
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountNullary(const attribute_id key_attr_id,
-                                      std::atomic<std::size_t> *vec_table,
-                                      ValueAccessorT *accessor) {
-  accessor->beginIteration();
-  while (accessor->next()) {
-    const KeyT *key = static_cast<const KeyT *>(
-        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-    existence_map_->setBit(loc);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, typename KeyT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorCountUnary(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<std::size_t> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) 
{
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    if (argument_accessor->getUntypedValue(argument_id) == nullptr) {
-      continue;
-    }
-    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) 
{
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
-        argument_accessor->template 
getUntypedValue<is_argument_nullable>(argument_id));
-    if (is_argument_nullable && argument == nullptr) {
-      continue;
-    }
-    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
-  }
-}
-
-template <bool use_two_accessors, bool is_key_nullable, bool 
is_argument_nullable,
-          typename KeyT, typename ArgumentT, typename StateT,
-          typename KeyValueAccessorT, typename ArgumentValueAccessorT>
-inline void CollisionFreeAggregationStateHashTable
-    ::upsertValueAccessorGenericSum(const attribute_id key_attr_id,
-                                    const attribute_id argument_id,
-                                    std::atomic<StateT> *vec_table,
-                                    KeyValueAccessorT *key_accessor,
-                                    ArgumentValueAccessorT *argument_accessor) 
{
-  key_accessor->beginIteration();
-  if (use_two_accessors) {
-    argument_accessor->beginIteration();
-  }
-  while (key_accessor->next()) {
-    if (use_two_accessors) {
-      argument_accessor->next();
-    }
-    const KeyT *key = static_cast<const KeyT *>(
-        key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
-    if (is_key_nullable && key == nullptr) {
-      continue;
-    }
-    const std::size_t loc = *key;
-    existence_map_->setBit(loc);
-    const ArgumentT *argument = static_cast<const ArgumentT *>(
-        argument_accessor->template 
getUntypedValue<is_argument_nullable>(argument_id));
-    if (is_argument_nullable && argument == nullptr) {
-      continue;
-    }
-    const ArgumentT arg_val = *argument;
-    std::atomic<StateT> &state = vec_table[loc];
-    StateT state_val = state.load(std::memory_order_relaxed);
-    while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
-  }
-}
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1e7a92a9/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp 
b/storage/CollisionFreeVectorTable.cpp
new file mode 100644
index 0000000..8065cd9
--- /dev/null
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeVectorTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+namespace quickstep {
+
+CollisionFreeVectorTable::CollisionFreeVectorTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_types.front()),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      storage_manager_(storage_manager) {
+  CHECK_EQ(1u, key_types.size());
+  DCHECK_GT(num_entries, 0u);
+
+  std::map<std::string, std::size_t> memory_offsets;
+  std::size_t required_memory = 0;
+
+  memory_offsets.emplace("existence_map", required_memory);
+  required_memory += CacheLineAlignedBytes(
+      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = 
handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        CHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    memory_offsets.emplace(std::string("state") + std::to_string(i),
+                           required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + 
memory_offsets.at("existence_map"),
+      num_entries,
+      false /* initialize */));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) +
+            memory_offsets.at(std::string("state") + std::to_string(i)));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ =
+      std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL));
+}
+
+CollisionFreeVectorTable::~CollisionFreeVectorTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeVectorTable::destroyPayload() {
+}
+
+bool CollisionFreeVectorTable::upsertValueAccessor(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    ValueAccessorMultiplexer *accessor_mux) {
+  DCHECK_EQ(1u, key_ids.size());
+
+  const ValueAccessorSource key_source = key_ids.front().source;
+  const attribute_id key_id = key_ids.front().attr_id;
+  const bool is_key_nullable = key_type_->isNullable();
+
+  if (handles_.empty()) {
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        accessor_mux->getValueAccessorBySource(key_source),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      upsertValueAccessorKeyOnlyHelper(is_key_nullable,
+                                       key_type_,
+                                       key_id,
+                                       accessor);
+    });
+    return true;
+  }
+
+  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accesor =
+      static_cast<ColumnVectorsValueAccessor 
*>(accessor_mux->getDerivedAccessor());
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+    const auto &argument_ids_i = argument_ids[i];
+
+    ValueAccessorSource argument_source;
+    attribute_id argument_id;
+    const Type *argument_type;
+    bool is_argument_nullable;
+
+    if (argument_ids_i.empty()) {
+      argument_source = ValueAccessorSource::kInvalid;
+      argument_id = kInvalidAttributeID;
+
+      DCHECK(argument_types.empty());
+      argument_type = nullptr;
+      is_argument_nullable = false;
+    } else {
+      DCHECK_EQ(1u, argument_ids_i.size());
+      argument_source = argument_ids_i.front().source;
+      argument_id = argument_ids_i.front().attr_id;
+
+      DCHECK_EQ(1u, argument_types.size());
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
+
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        base_accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (key_source == ValueAccessorSource::kBase) {
+        if (argument_source == ValueAccessorSource::kBase) {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   accessor,
+                                                   accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_id,
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  accessor,
+                                                  derived_accesor);
+        }
+      } else {
+        if (argument_source == ValueAccessorSource::kBase) {
+          upsertValueAccessorDispatchHelper<true>(is_key_nullable,
+                                                  is_argument_nullable,
+                                                  key_type_,
+                                                  argument_type,
+                                                  handle->getAggregationID(),
+                                                  key_id,
+                                                  argument_id,
+                                                  vec_tables_[i],
+                                                  derived_accesor,
+                                                  accessor);
+        } else {
+          upsertValueAccessorDispatchHelper<false>(is_key_nullable,
+                                                   is_argument_nullable,
+                                                   key_type_,
+                                                   argument_type,
+                                                   handle->getAggregationID(),
+                                                   key_id,
+                                                   argument_id,
+                                                   vec_tables_[i],
+                                                   derived_accesor,
+                                                   derived_accesor);
+        }
+      }
+    });
+  }
+  return true;
+}
+
+void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id,
+                                           NativeColumnVector *output_cv) 
const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, 
output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id,
+                                             std::size_t handle_id,
+                                             NativeColumnVector *output_cv) 
const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep

Reply via email to