Repository: incubator-quickstep Updated Branches: refs/heads/destroy-agg-state-operator b162047b6 -> 590ba4dac (forced update)
Introduced DestroyAggregationState operator - Similar to the pattern with DestroyHash, this operator destroys the AggregationState once the Finalize aggregation operator finishes its execution. - Optimizer support for DestroyAggregationState operator. - Removed unused QueryContext::releaseAggregationState method. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/590ba4da Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/590ba4da Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/590ba4da Branch: refs/heads/destroy-agg-state-operator Commit: 590ba4dacfe0c65a1d254e79daaecbf4f1f5854b Parents: 1d10422 Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Tue Aug 23 11:00:57 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Tue Sep 6 10:21:39 2016 -0500 ---------------------------------------------------------------------- query_execution/QueryContext.hpp | 10 +- query_optimizer/CMakeLists.txt | 1 + query_optimizer/ExecutionGenerator.cpp | 10 ++ relational_operators/CMakeLists.txt | 16 +++ .../DestroyAggregationStateOperator.cpp | 64 ++++++++++ .../DestroyAggregationStateOperator.hpp | 120 +++++++++++++++++++ .../FinalizeAggregationOperator.cpp | 2 +- .../FinalizeAggregationOperator.hpp | 3 +- relational_operators/WorkOrder.proto | 7 ++ relational_operators/WorkOrderFactory.cpp | 16 ++- .../tests/AggregationOperator_unittest.cpp | 25 ++++ 11 files changed, 264 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index c54c7ff..393b55e 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -182,16 +182,14 @@ class QueryContext { } /** - * @brief Release the given AggregationOperationState. + * @brief Destroy the given aggregation state. * - * @param id The id of the AggregationOperationState to destroy. - * - * @return The AggregationOperationState, alreadly created in the constructor. + * @param id The ID of the AggregationOperationState to destroy. **/ - inline AggregationOperationState* releaseAggregationState(const aggregation_state_id id) { + inline void destroyAggregationState(const aggregation_state_id id) { DCHECK_LT(id, aggregation_states_.size()); DCHECK(aggregation_states_[id]); - return aggregation_states_[id].release(); + aggregation_states_[id].reset(nullptr); } /** http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 56ae52f..32f7885 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -118,6 +118,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_relationaloperators_CreateIndexOperator quickstep_relationaloperators_CreateTableOperator quickstep_relationaloperators_DeleteOperator + quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_DestroyHashOperator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 2e03e09..130134c 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -94,6 +94,7 @@ #include "relational_operators/CreateIndexOperator.hpp" #include "relational_operators/CreateTableOperator.hpp" #include "relational_operators/DeleteOperator.hpp" +#include "relational_operators/DestroyAggregationStateOperator.hpp" #include "relational_operators/DestroyHashOperator.hpp" #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" @@ -1464,6 +1465,15 @@ void ExecutionGenerator::convertAggregate( std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, output_relation); + + const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = + execution_plan_->addRelationalOperator( + new DestroyAggregationStateOperator(query_handle_->query_id(), + aggr_state_index)); + + execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index, + finalize_aggregation_operator_index, + true); } void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 43a42f9..cdfe309 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -33,6 +33,9 @@ add_library(quickstep_relationaloperators_AggregationOperator AggregationOperato add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp) add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp) add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp) +add_library(quickstep_relationaloperators_DestroyAggregationStateOperator + DestroyAggregationStateOperator.cpp + DestroyAggregationStateOperator.hpp) add_library(quickstep_relationaloperators_DeleteOperator DeleteOperator.cpp DeleteOperator.hpp) add_library(quickstep_relationaloperators_DestroyHashOperator DestroyHashOperator.cpp DestroyHashOperator.hpp) add_library(quickstep_relationaloperators_DropTableOperator DropTableOperator.cpp DropTableOperator.hpp) @@ -136,6 +139,16 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator quickstep_threading_ThreadIDBasedMap quickstep_utility_Macros tmb) +target_link_libraries(quickstep_relationaloperators_DestroyAggregationStateOperator + glog + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_relationaloperators_DestroyHashOperator glog quickstep_queryexecution_QueryContext @@ -451,6 +464,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory quickstep_relationaloperators_AggregationOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_DeleteOperator + quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_DestroyHashOperator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator @@ -484,6 +498,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_CreateIndexOperator quickstep_relationaloperators_CreateTableOperator quickstep_relationaloperators_DeleteOperator + quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_DestroyHashOperator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator @@ -533,6 +548,7 @@ target_link_libraries(AggregationOperator_unittest quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_DestroyAggregationStateOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_WorkOrder quickstep_storage_AggregationOperationState_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/DestroyAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp new file mode 100644 index 0000000..62ca9e7 --- /dev/null +++ b/relational_operators/DestroyAggregationStateOperator.cpp @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/DestroyAggregationStateOperator.hpp" + +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool DestroyAggregationStateOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + container->addNormalWorkOrder( + new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, query_context), + op_index_); + } + return work_generated_; +} + +bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_); + + container->addWorkOrderProto(proto, op_index_); + } + return work_generated_; +} + +void DestroyAggregationStateWorkOrder::execute() { + query_context_->destroyAggregationState(aggr_state_index_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/DestroyAggregationStateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp new file mode 100644 index 0000000..bfb5ff1 --- /dev/null +++ b/relational_operators/DestroyAggregationStateOperator.hpp @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ + +#include <string> + +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which destroys a shared aggregation state. + **/ +class DestroyAggregationStateOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this operator belongs. + * @param aggr_state_index The index of the AggregationState in QueryContext. + **/ + DestroyAggregationStateOperator( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + aggr_state_index_(aggr_state_index), + work_generated_(false) {} + + ~DestroyAggregationStateOperator() override {} + + std::string getName() const override { + return "DestroyAggregationStateOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + bool work_generated_; + + DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator); +}; + +/** + * @brief A WorkOrder produced by DestroyAggregationStateOperator. + **/ +class DestroyAggregationStateWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this WorkOrder belongs. + * @param aggr_state_index The index of the AggregationState in QueryContext. + * @param query_context The QueryContext to use. + **/ + DestroyAggregationStateWorkOrder( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + QueryContext *query_context) + : WorkOrder(query_id), + aggr_state_index_(aggr_state_index), + query_context_(DCHECK_NOTNULL(query_context)) {} + + ~DestroyAggregationStateWorkOrder() override {} + + void execute() override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + QueryContext *query_context_; + + DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_DESTROY_AGGREGATION_STATE_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 65e62c4..7e337de 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -44,7 +44,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders( container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( query_id_, - query_context->releaseAggregationState(aggr_state_index_), + query_context->getAggregationState(aggr_state_index_), query_context->getInsertDestination(output_destination_index_)), op_index_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index 7ac6712..0aeac2a 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -22,7 +22,6 @@ #include <cstddef> #include <string> -#include <memory> #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" @@ -133,7 +132,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder { void execute() override; private: - std::unique_ptr<AggregationOperationState> state_; + AggregationOperationState *state_; InsertDestination *output_destination_; DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 02aa50e..3eed379 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -42,6 +42,7 @@ enum WorkOrderType { TEXT_SCAN = 18; UPDATE = 19; WINDOW_AGGREGATION = 20; + DESTROY_AGGREGATION_STATE = 21; } message WorkOrder { @@ -253,3 +254,9 @@ message WindowAggregationWorkOrder { optional int32 insert_destination_index = 338; } } + +message DestroyAggregationStateWorkOrder { + extend WorkOrder { + optional uint32 aggr_state_index = 339; + } +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 6970486..2356bab 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -30,6 +30,7 @@ #include "relational_operators/AggregationOperator.hpp" #include "relational_operators/BuildHashOperator.hpp" #include "relational_operators/DeleteOperator.hpp" +#include "relational_operators/DestroyAggregationStateOperator.hpp" #include "relational_operators/DestroyHashOperator.hpp" #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" @@ -116,6 +117,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder shiftboss_client_id, bus); } + case serialization::DESTROY_AGGREGATION_STATE: { + LOG(INFO) << "Creating DestroyAggregationStateWorkOrder"; + return new DestroyAggregationStateWorkOrder( + proto.query_id(), + proto.GetExtension( + serialization::DestroyAggregationStateWorkOrder::aggr_state_index), + query_context); + } case serialization::DESTROY_HASH: { LOG(INFO) << "Creating DestroyHashWorkOrder"; return new DestroyHashWorkOrder( @@ -145,7 +154,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder LOG(INFO) << "Creating FinalizeAggregationWorkOrder"; return new FinalizeAggregationWorkOrder( proto.query_id(), - query_context->releaseAggregationState(proto.GetExtension( + query_context->getAggregationState(proto.GetExtension( serialization::FinalizeAggregationWorkOrder::aggr_state_index)), query_context->getInsertDestination( proto.GetExtension(serialization::FinalizeAggregationWorkOrder:: @@ -489,6 +498,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.HasExtension(serialization::DeleteWorkOrder::block_id) && proto.HasExtension(serialization::DeleteWorkOrder::operator_index); } + case serialization::DESTROY_AGGREGATION_STATE: { + return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) && + query_context.isValidAggregationStateId( + proto.GetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index)); + } case serialization::DESTROY_HASH: { return proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) && query_context.isValidJoinHashTableId( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/590ba4da/relational_operators/tests/AggregationOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp index 7a5b461..0138362 100644 --- a/relational_operators/tests/AggregationOperator_unittest.cpp +++ b/relational_operators/tests/AggregationOperator_unittest.cpp @@ -44,6 +44,7 @@ #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/AggregationOperator.hpp" +#include "relational_operators/DestroyAggregationStateOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" #include "relational_operators/WorkOrder.hpp" #include "storage/AggregationOperationState.pb.h" @@ -292,6 +293,9 @@ class AggregationOperatorTest : public ::testing::Test { *result_table_, insert_destination_index)); + destroy_aggr_state_op_.reset( + new DestroyAggregationStateOperator(kQueryId, aggr_state_index)); + // Set up the QueryContext. query_context_.reset(new QueryContext(query_context_proto, *db_, @@ -304,6 +308,7 @@ class AggregationOperatorTest : public ::testing::Test { // class' checks about operator index are successful. op_->setOperatorIndex(kOpIndex); finalize_op_->setOperatorIndex(kOpIndex); + destroy_aggr_state_op_->setOperatorIndex(kOpIndex); } void setupTestGroupBy(const std::string &stem, @@ -379,6 +384,9 @@ class AggregationOperatorTest : public ::testing::Test { *result_table_, insert_destination_index)); + destroy_aggr_state_op_.reset( + new DestroyAggregationStateOperator(kQueryId, aggr_state_index)); + // Set up the QueryContext. query_context_.reset(new QueryContext(query_context_proto, *db_, @@ -391,6 +399,7 @@ class AggregationOperatorTest : public ::testing::Test { // class' checks about operator index are successful. op_->setOperatorIndex(kOpIndex); finalize_op_->setOperatorIndex(kOpIndex); + destroy_aggr_state_op_->setOperatorIndex(kOpIndex); } void execute() { @@ -423,6 +432,21 @@ class AggregationOperatorTest : public ::testing::Test { work_order->execute(); delete work_order; } + + destroy_aggr_state_op_->informAllBlockingDependenciesMet(); + + WorkOrdersContainer destroy_aggr_state_op_container(1, 0); + const std::size_t destroy_aggr_state_op_index = 0; + destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container, + query_context_.get(), + storage_manager_.get(), + foreman_client_id_, + &bus_); + while (destroy_aggr_state_op_container.hasNormalWorkOrder(destroy_aggr_state_op_index)) { + WorkOrder *work_order = destroy_aggr_state_op_container.getNormalWorkOrder(destroy_aggr_state_op_index); + work_order->execute(); + delete work_order; + } } template <class T> @@ -528,6 +552,7 @@ class AggregationOperatorTest : public ::testing::Test { std::unique_ptr<AggregationOperator> op_; std::unique_ptr<FinalizeAggregationOperator> finalize_op_; + std::unique_ptr<DestroyAggregationStateOperator> destroy_aggr_state_op_; }; const char AggregationOperatorTest::kDatabaseName[] = "database";