Initial commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/97d8dca8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/97d8dca8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/97d8dca8 Branch: refs/heads/LIP-for-tpch Commit: 97d8dca8550e79a123d74545798a84321913ef41 Parents: aaecc76 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Sat Jun 11 23:14:00 2016 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Sat Jul 30 01:34:59 2016 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 2 + catalog/CMakeLists.txt | 9 + catalog/Catalog.proto | 5 + catalog/CatalogRelation.cpp | 16 +- catalog/CatalogRelationConstraints.cpp | 55 ++ catalog/CatalogRelationConstraints.hpp | 110 ++++ catalog/CatalogRelationSchema.cpp | 15 + catalog/CatalogRelationSchema.hpp | 16 +- cli/CommandExecutor.cpp | 25 +- cli/QuickstepCli.cpp | 69 +++ compression/CompressionDictionaryLite.hpp | 42 ++ query_execution/CMakeLists.txt | 1 + query_execution/QueryContext.cpp | 11 +- query_execution/Worker.cpp | 5 + .../tests/QueryManagerSingleNode_unittest.cpp | 4 + query_optimizer/CMakeLists.txt | 2 + query_optimizer/ExecutionGenerator.cpp | 87 ++-- query_optimizer/ExecutionGenerator.hpp | 2 +- query_optimizer/ExecutionHeuristics.cpp | 171 ++++--- query_optimizer/ExecutionHeuristics.hpp | 79 ++- query_optimizer/PhysicalGenerator.cpp | 5 +- query_optimizer/cost_model/SimpleCostModel.cpp | 4 +- .../cost_model/StarSchemaSimpleCostModel.cpp | 42 +- query_optimizer/expressions/ExpressionUtil.hpp | 8 +- query_optimizer/physical/Aggregate.cpp | 5 + query_optimizer/physical/Aggregate.hpp | 23 +- query_optimizer/physical/HashJoin.cpp | 27 + query_optimizer/physical/HashJoin.hpp | 23 +- query_optimizer/physical/Physical.hpp | 55 ++ query_optimizer/physical/Selection.cpp | 6 + query_optimizer/physical/Selection.hpp | 3 + query_optimizer/physical/TableReference.cpp | 18 + query_optimizer/physical/TableReference.hpp | 3 + query_optimizer/rules/AttachBloomFilters.cpp | 308 ++++++++++++ query_optimizer/rules/AttachBloomFilters.hpp | 118 +++++ query_optimizer/rules/CMakeLists.txt | 17 + .../StarSchemaHashJoinOrderOptimization.cpp | 277 ++++++---- .../StarSchemaHashJoinOrderOptimization.hpp | 100 ++-- .../tests/ExecutionHeuristics_unittest.cpp | 3 +- relational_operators/AggregationOperator.hpp | 4 + relational_operators/BuildHashOperator.hpp | 4 + relational_operators/CreateIndexOperator.hpp | 4 + relational_operators/CreateTableOperator.hpp | 4 + relational_operators/DeleteOperator.hpp | 4 + relational_operators/DestroyHashOperator.hpp | 4 + relational_operators/DropTableOperator.hpp | 4 + .../FinalizeAggregationOperator.hpp | 4 + relational_operators/HashJoinOperator.cpp | 10 + relational_operators/HashJoinOperator.hpp | 40 +- relational_operators/InsertOperator.hpp | 4 + .../NestedLoopsJoinOperator.hpp | 4 + relational_operators/RelationalOperator.hpp | 16 + relational_operators/SampleOperator.hpp | 4 + relational_operators/SaveBlocksOperator.hpp | 4 + relational_operators/SelectOperator.hpp | 4 + relational_operators/SortMergeRunOperator.hpp | 4 + .../SortRunGenerationOperator.hpp | 4 + relational_operators/TableGeneratorOperator.hpp | 4 + relational_operators/TextScanOperator.hpp | 4 + relational_operators/UpdateOperator.hpp | 4 + .../WindowAggregationOperator.hpp | 4 + relational_operators/WorkOrder.hpp | 11 +- storage/AggregationOperationState.cpp | 98 +++- storage/AggregationOperationState.hpp | 10 +- storage/AggregationOperationState.proto | 6 + storage/BasicColumnStoreValueAccessor.hpp | 26 +- storage/BloomFilterIndexSubBlock.cpp | 4 +- storage/BloomFilterIndexSubBlock.hpp | 6 - storage/CMakeLists.txt | 2 + storage/CompressedColumnStoreValueAccessor.hpp | 22 + .../CompressedPackedRowStoreValueAccessor.hpp | 22 + storage/HashTable.hpp | 185 ++++--- storage/HashTable.proto | 10 +- storage/HashTableFactory.hpp | 23 +- storage/PackedRowStoreValueAccessor.hpp | 25 +- storage/SplitRowStoreValueAccessor.hpp | 45 ++ storage/StorageBlock.cpp | 28 +- storage/StorageBlock.hpp | 7 +- storage/ValueAccessor.hpp | 36 ++ types/containers/ColumnVector.hpp | 35 ++ types/containers/ColumnVectorsValueAccessor.hpp | 17 + utility/BloomFilter.hpp | 502 ++++++++++++++----- utility/BloomFilter.proto | 6 +- utility/BloomFilterAdapter.hpp | 142 ++++++ utility/CMakeLists.txt | 20 + utility/DAGVisualizer.cpp | 167 ++++++ utility/DAGVisualizer.hpp | 85 ++++ utility/DisjointTreeForest.hpp | 116 +++++ utility/EventProfiler.cpp | 29 ++ utility/EventProfiler.hpp | 188 +++++++ utility/PlanVisualizer.cpp | 42 +- 91 files changed, 3240 insertions(+), 588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bbde61..6a0c8b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -770,6 +770,8 @@ target_link_libraries(quickstep_cli_shell quickstep_queryoptimizer_QueryProcessor quickstep_storage_PreloaderThread quickstep_threading_ThreadIDBasedMap + quickstep_utility_DAGVisualizer + quickstep_utility_EventProfiler quickstep_utility_Macros quickstep_utility_PtrVector quickstep_utility_SqlError http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt index 64b4f16..0f50706 100644 --- a/catalog/CMakeLists.txt +++ b/catalog/CMakeLists.txt @@ -35,6 +35,9 @@ add_library(quickstep_catalog_CatalogDatabaseCache CatalogDatabaseCache.cpp Cata add_library(quickstep_catalog_CatalogDatabaseLite ../empty_src.cpp CatalogDatabaseLite.hpp) add_library(quickstep_catalog_CatalogErrors ../empty_src.cpp CatalogErrors.hpp) add_library(quickstep_catalog_CatalogRelation CatalogRelation.cpp CatalogRelation.hpp) +add_library(quickstep_catalog_CatalogRelationConstraints + CatalogRelationConstraints.cpp + CatalogRelationConstraints.hpp) add_library(quickstep_catalog_CatalogRelationSchema CatalogRelationSchema.cpp CatalogRelationSchema.hpp) @@ -117,6 +120,10 @@ target_link_libraries(quickstep_catalog_CatalogRelation quickstep_threading_SpinSharedMutex quickstep_utility_Macros quickstep_utility_PtrVector) +target_link_libraries(quickstep_catalog_CatalogRelationConstraints + quickstep_catalog_CatalogTypedefs + quickstep_catalog_Catalog_proto + quickstep_utility_Macros) target_link_libraries(quickstep_catalog_CatalogRelationStatistics quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto @@ -136,6 +143,7 @@ target_link_libraries(quickstep_catalog_CatalogRelationSchema glog quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogErrors + quickstep_catalog_CatalogRelationConstraints quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto quickstep_types_Type @@ -182,6 +190,7 @@ target_link_libraries(quickstep_catalog quickstep_catalog_CatalogDatabaseLite quickstep_catalog_CatalogErrors quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogRelationConstraints quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogRelationStatistics quickstep_catalog_CatalogTypedefs http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/Catalog.proto ---------------------------------------------------------------------- diff --git a/catalog/Catalog.proto b/catalog/Catalog.proto index ce4bc2e..a51172f 100644 --- a/catalog/Catalog.proto +++ b/catalog/Catalog.proto @@ -80,6 +80,10 @@ message IndexScheme { repeated IndexEntry index_entries = 1; } +message CatalogRelationConstraints { + repeated int32 primary_key = 1; +} + message CatalogRelationStatistics { optional fixed64 num_tuples = 1; @@ -96,6 +100,7 @@ message CatalogRelationSchema { required bool temporary = 3; repeated CatalogAttribute attributes = 4; + optional CatalogRelationConstraints constraints = 5; extensions 16 to max; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CatalogRelation.cpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogRelation.cpp b/catalog/CatalogRelation.cpp index 01aebb5..682b6be 100644 --- a/catalog/CatalogRelation.cpp +++ b/catalog/CatalogRelation.cpp @@ -143,21 +143,7 @@ CatalogRelation::CatalogRelation(const serialization::CatalogRelationSchema &pro } serialization::CatalogRelationSchema CatalogRelation::getProto() const { - serialization::CatalogRelationSchema proto; - - proto.set_relation_id(id_); - proto.set_name(name_); - proto.set_temporary(temporary_); - - for (PtrVector<CatalogAttribute, true>::const_iterator it = attr_vec_.begin(); - it != attr_vec_.end(); - ++it) { - if (it.isNull()) { - proto.add_attributes(); - } else { - proto.add_attributes()->MergeFrom(it->getProto()); - } - } + serialization::CatalogRelationSchema proto = CatalogRelationSchema::getProto(); proto.MutableExtension(serialization::CatalogRelation::default_layout) ->MergeFrom(getDefaultStorageBlockLayout().getDescription()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CatalogRelationConstraints.cpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogRelationConstraints.cpp b/catalog/CatalogRelationConstraints.cpp new file mode 100644 index 0000000..4525a98 --- /dev/null +++ b/catalog/CatalogRelationConstraints.cpp @@ -0,0 +1,55 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "catalog/CatalogRelationConstraints.hpp" + +#include "catalog/Catalog.pb.h" + +namespace quickstep { + +CatalogRelationConstraints::CatalogRelationConstraints( + const serialization::CatalogRelationConstraints &proto) { + if (proto.primary_key_size() > 0) { + primary_key_.reset(new std::set<attribute_id>()); + for (std::size_t i = 0; i < proto.primary_key_size(); ++i) { + primary_key_->emplace(proto.primary_key(i)); + } + } +} + +serialization::CatalogRelationConstraints CatalogRelationConstraints::getProto() const { + serialization::CatalogRelationConstraints proto; + if (primary_key_ != nullptr) { + for (const auto attr_id : *primary_key_) { + proto.add_primary_key(attr_id); + } + } + return proto; +} + +bool CatalogRelationConstraints::ProtoIsValid( + const serialization::CatalogRelationConstraints &proto, + const std::size_t num_attributes) { + for (std::size_t i = 0; i < proto.primary_key_size(); ++i) { + if (proto.primary_key(i) >= num_attributes) { + return false; + } + } + return true; +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CatalogRelationConstraints.hpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogRelationConstraints.hpp b/catalog/CatalogRelationConstraints.hpp new file mode 100644 index 0000000..896c072 --- /dev/null +++ b/catalog/CatalogRelationConstraints.hpp @@ -0,0 +1,110 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_ +#define QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_ + +#include <algorithm> +#include <cstddef> +#include <memory> +#include <set> +#include <utility> + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogTypedefs.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Catalog + * @{ + */ + +/** + * @brief Constraints on a catalog relation. + **/ +class CatalogRelationConstraints { + public: + /** + * @brief Constructor. + **/ + CatalogRelationConstraints() {} + + /** + * @brief Reconstruct a CatalogRelationConstraints object from its serialized + * Protocol Buffer form. + * + * @param proto The Protocol Buffer serialization of a CatalogRelationConstraints + * object, previously produced by getProto(). + **/ + explicit CatalogRelationConstraints(const serialization::CatalogRelationConstraints &proto); + + /** + * @brief Serialize the CatalogRelationConstraints object as Protocol Buffer. + * + * @return The Protocol Buffer representation of the CatalogRelationConstraints + * object. + **/ + serialization::CatalogRelationConstraints getProto() const; + + static bool ProtoIsValid(const serialization::CatalogRelationConstraints &proto, + const std::size_t num_attributes); + + bool hasPrimaryKey() const { + return (primary_key_ != nullptr); + } + + const std::set<attribute_id>* getPrimaryKey() const { + return primary_key_.get(); + } + + template <typename IterableT> + void setPrimaryKey(IterableT &&primary_key) { + CHECK(!primary_key.empty()); + primary_key_.reset( + new std::set<attribute_id>(primary_key.begin(), primary_key.end())); + } + + void removePrimaryKey() { + primary_key_.reset(); + } + + bool impliesUniqueAttributes(const std::set<attribute_id> &attributes) const { + if (primary_key_ == nullptr) { + return false; + } + + std::vector<attribute_id> attr_intersection; + std::set_intersection(primary_key_->begin(), primary_key_->end(), + attributes.begin(), attributes.end(), + std::back_inserter(attr_intersection)); + return (attr_intersection.size() == primary_key_->size()); + } + + private: + std::unique_ptr<std::set<attribute_id>> primary_key_; + + DISALLOW_COPY_AND_ASSIGN(CatalogRelationConstraints); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_CATALOG_CATALOG_RELATION_CONSTRAINTS_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CatalogRelationSchema.cpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogRelationSchema.cpp b/catalog/CatalogRelationSchema.cpp index 97c834f..bf8217d 100644 --- a/catalog/CatalogRelationSchema.cpp +++ b/catalog/CatalogRelationSchema.cpp @@ -27,6 +27,7 @@ #include "catalog/Catalog.pb.h" #include "catalog/CatalogAttribute.hpp" #include "catalog/CatalogErrors.hpp" +#include "catalog/CatalogRelationConstraints.hpp" #include "catalog/CatalogTypedefs.hpp" #include "types/Type.hpp" #include "utility/PtrVector.hpp" @@ -70,6 +71,12 @@ CatalogRelationSchema::CatalogRelationSchema(const serialization::CatalogRelatio attr_vec_.push_back(nullptr); } } + + if (proto.has_constraints()) { + constraints_.reset(new CatalogRelationConstraints(proto.constraints())); + } else { + constraints_.reset(new CatalogRelationConstraints()); + } } bool CatalogRelationSchema::ProtoIsValid(const serialization::CatalogRelationSchema &proto) { @@ -84,6 +91,12 @@ bool CatalogRelationSchema::ProtoIsValid(const serialization::CatalogRelationSch } } + if (proto.has_constraints() + && !CatalogRelationConstraints::ProtoIsValid(proto.constraints(), + proto.attributes_size())) { + return false; + } + return true; } @@ -104,6 +117,8 @@ serialization::CatalogRelationSchema CatalogRelationSchema::getProto() const { } } + proto.mutable_constraints()->CopyFrom(constraints_->getProto()); + return proto; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/catalog/CatalogRelationSchema.hpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogRelationSchema.hpp b/catalog/CatalogRelationSchema.hpp index d773bc7..0c6c207 100644 --- a/catalog/CatalogRelationSchema.hpp +++ b/catalog/CatalogRelationSchema.hpp @@ -21,12 +21,14 @@ #define QUICKSTEP_CATALOG_CATALOG_RELATION_SCHEMA_HPP_ #include <cstddef> +#include <memory> #include <string> #include <unordered_map> #include <vector> #include "catalog/Catalog.pb.h" #include "catalog/CatalogAttribute.hpp" +#include "catalog/CatalogRelationConstraints.hpp" #include "catalog/CatalogTypedefs.hpp" #include "utility/Macros.hpp" #include "utility/PtrVector.hpp" @@ -427,6 +429,14 @@ class CatalogRelationSchema { return max_byte_lengths_; } + const CatalogRelationConstraints& getConstraints() const { + return *constraints_; + } + + CatalogRelationConstraints* getConstraintsMutable() { + return constraints_.get(); + } + protected: /** * @brief Create a new relation. @@ -456,7 +466,8 @@ class CatalogRelationSchema { min_variable_byte_length_excluding_nullable_(0), estimated_variable_byte_length_(0), current_nullable_attribute_index_(-1), - current_variable_length_attribute_index_(-1) { + current_variable_length_attribute_index_(-1), + constraints_(new CatalogRelationConstraints()) { } /** @@ -532,6 +543,9 @@ class CatalogRelationSchema { std::vector<int> variable_length_attribute_indices_; int current_variable_length_attribute_index_; + // Primary key, foreign keys, etc. + std::unique_ptr<CatalogRelationConstraints> constraints_; + private: friend class CatalogDatabase; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/cli/CommandExecutor.cpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp index 8acfae8..5b302c0 100644 --- a/cli/CommandExecutor.cpp +++ b/cli/CommandExecutor.cpp @@ -251,7 +251,8 @@ inline TypedValue executeQueryForSingleResult( return value; } -void executeAnalyze(const tmb::client_id main_thread_client_id, +void executeAnalyze(const PtrVector<ParseString> *arguments, + const tmb::client_id main_thread_client_id, const tmb::client_id foreman_client_id, MessageBus *bus, QueryProcessor *query_processor, @@ -260,8 +261,19 @@ void executeAnalyze(const tmb::client_id main_thread_client_id, StorageManager *storage_manager = query_processor->getStorageManager(); std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper()); - std::vector<std::reference_wrapper<const CatalogRelation>> relations( - database.begin(), database.end()); + std::vector<std::reference_wrapper<const CatalogRelation>> relations; + if (arguments->size() == 0) { + relations.insert(relations.begin(), database.begin(), database.end()); + } else { + for (const auto &rel_name : *arguments) { + const CatalogRelation *rel = database.getRelationByName(rel_name.value()); + if (rel == nullptr) { + THROW_SQL_ERROR_AT(&rel_name) << "Table does not exist"; + } else { + relations.emplace_back(*rel); + } + } + } // Analyze each relation in the database. for (const CatalogRelation &relation : relations) { @@ -341,8 +353,11 @@ void executeCommand(const ParseStatement &statement, executeDescribeTable(arguments, catalog_database, out); } } else if (command_str == C::kAnalyzeCommand) { - executeAnalyze( - main_thread_client_id, foreman_client_id, bus, query_processor, out); + executeAnalyze(arguments, + main_thread_client_id, + foreman_client_id, + bus, + query_processor, out); } else { THROW_SQL_ERROR_AT(command.command()) << "Invalid Command"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index 68a3599..8031dd3 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -52,6 +52,9 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include <gperftools/profiler.h> #endif +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogRelationConstraints.hpp" #include "cli/DefaultsConfigurator.hpp" #include "cli/InputParserUtil.hpp" #include "cli/PrintToScreen.hpp" @@ -75,6 +78,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "storage/PreloaderThread.hpp" #include "threading/ThreadIDBasedMap.hpp" +#include "utility/DAGVisualizer.hpp" +#include "utility/EventProfiler.hpp" #include "utility/Macros.hpp" #include "utility/PtrVector.hpp" #include "utility/SqlError.hpp" @@ -89,6 +94,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "tmb/message_bus.h" #include "tmb/message_style.h" +#include "google/protobuf/text_format.h" + namespace quickstep { class CatalogRelation; } @@ -185,9 +192,50 @@ DEFINE_string(profile_file_name, "", // To put things in perspective, the first run is, in my experiments, about 5-10 // times more expensive than the average run. That means the query needs to be // run at least a hundred times to make the impact of the first run small (< 5 %). +DEFINE_string(profile_output, "", + "Output file name for writing the profiled events."); +DEFINE_bool(visualize_dag, false, + "If true, visualize the execution plan DAG into a graph in DOT format."); } // namespace quickstep +void addPrimaryKeyInfoForTPCHTables(quickstep::CatalogDatabase *database) { + const std::vector<std::pair<std::string, std::vector<std::string>>> rel_pkeys = { + { "region", { "r_regionkey" } }, + { "nation", { "n_nationkey" } }, + { "supplier", { "s_suppkey" } }, + { "customer", { "c_custkey" } }, + { "part", { "p_partkey" } }, + { "partsupp", { "ps_partkey", "ps_suppkey" } }, + { "orders", { "o_orderkey" } } + }; + for (const auto &rel_pair : rel_pkeys) { + CatalogRelation *rel = database->getRelationByNameMutable(rel_pair.first); + std::vector<quickstep::attribute_id> attrs; + for (const auto &pkey : rel_pair.second) { + attrs.emplace_back(rel->getAttributeByName(pkey)->getID()); + } + rel->getConstraintsMutable()->setPrimaryKey(attrs); + } +} + +void addPrimaryKeyInfoForSSBTables(quickstep::CatalogDatabase *database) { + const std::vector<std::pair<std::string, std::vector<std::string>>> rel_pkeys = { + { "supplier", { "s_suppkey" } }, + { "customer", { "c_custkey" } }, + { "part", { "p_partkey" } }, + { "ddate", { "d_datekey" } } + }; + for (const auto &rel_pair : rel_pkeys) { + CatalogRelation *rel = database->getRelationByNameMutable(rel_pair.first); + std::vector<quickstep::attribute_id> attrs; + for (const auto &pkey : rel_pair.second) { + attrs.emplace_back(rel->getAttributeByName(pkey)->getID()); + } + rel->getConstraintsMutable()->setPrimaryKey(attrs); + } +} + int main(int argc, char* argv[]) { google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -295,6 +343,15 @@ int main(int argc, char* argv[]) { LOG(FATAL) << "NON-STANDARD EXCEPTION DURING STARTUP"; } +// addPrimaryKeyInfoForTPCHTables(query_processor->getDefaultDatabase()); +// addPrimaryKeyInfoForSSBTables(query_processor->getDefaultDatabase()); +// std::string proto_str; +// google::protobuf::TextFormat::PrintToString( +// query_processor->getDefaultDatabase()->getProto(), &proto_str); +// std::cerr << proto_str << "\n"; +// query_processor->markCatalogAltered(); +// query_processor->saveCatalog(); + // Parse the CPU affinities for workers and the preloader thread, if enabled // to warm up the buffer pool. const vector<int> worker_cpu_affinities = @@ -434,6 +491,8 @@ int main(int argc, char* argv[]) { } DCHECK(query_handle->getQueryPlanMutable() != nullptr); + quickstep::simple_profiler.clear(); + quickstep::relop_profiler.clear(); start = std::chrono::steady_clock::now(); QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( main_thread_client_id, @@ -446,6 +505,11 @@ int main(int argc, char* argv[]) { main_thread_client_id, &bus); end = std::chrono::steady_clock::now(); + if (quickstep::FLAGS_visualize_dag) { + quickstep::DAGVisualizer visualizer(*query_handle->getQueryPlanMutable()); + std::cerr << "\n" << visualizer.toDOT() << "\n"; + } + const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, @@ -471,6 +535,11 @@ int main(int argc, char* argv[]) { foreman.printWorkOrderProfilingResults(query_handle->query_id(), stdout); } + if (!quickstep::FLAGS_profile_output.empty()) { + std::ofstream ofs(quickstep::FLAGS_profile_output, std::ios::out); + quickstep::simple_profiler.writeToStream(ofs); + ofs.close(); + } } catch (const std::exception &e) { fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what()); break; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/compression/CompressionDictionaryLite.hpp ---------------------------------------------------------------------- diff --git a/compression/CompressionDictionaryLite.hpp b/compression/CompressionDictionaryLite.hpp index 45019c0..8c7741f 100644 --- a/compression/CompressionDictionaryLite.hpp +++ b/compression/CompressionDictionaryLite.hpp @@ -174,6 +174,15 @@ class CompressionDictionaryLite { } } + template <bool check_null = true> + inline std::pair<const void*, std::size_t> getUntypedValueAndByteLengthForCode(const std::uint32_t code) const { + if (type_is_variable_length_) { + return variableLengthGetUntypedValueAndByteLengthHelper<std::uint32_t, check_null>(code); + } else { + return fixedLengthGetUntypedValueAndByteLengthHelper<std::uint32_t, check_null>(code); + } + } + /** * @brief Get the value represented by the specified code as a TypedValue. * @note This version is for codes of 8 bits or less. Also see @@ -255,6 +264,39 @@ class CompressionDictionaryLite { return retval; } + template <typename CodeType, bool check_null = true> + inline std::pair<const void*, std::size_t> fixedLengthGetUntypedValueAndByteLengthHelper( + const CodeType code) const { + if (check_null && (code == getNullCode())) { + return std::make_pair(nullptr, 0); + } + DCHECK_LT(code, numberOfCodes()); + return std::make_pair(static_cast<const char*>(dictionary_memory_) + + 2 * sizeof(std::uint32_t) // Header. + + code * type_fixed_byte_length_, // Index into value array. + type_fixed_byte_length_); + } + + template <typename CodeType, bool check_null = true> + inline std::pair<const void*, std::size_t> variableLengthGetUntypedValueAndByteLengthHelper( + const CodeType code) const { + if (check_null && (code == getNullCode())) { + return std::make_pair(nullptr, 0); + } + DCHECK_LT(code, numberOfCodes()); + + const std::uint32_t value_offset = static_cast<const std::uint32_t*>(dictionary_memory_)[code + 2]; + const void *data_ptr = variable_length_data_region_ + value_offset; + DCHECK_LT(data_ptr, static_cast<const char*>(dictionary_memory_) + dictionary_memory_size_); + + std::size_t data_size = (code == *static_cast<const std::uint32_t*>(dictionary_memory_) - 1) ? + (static_cast<const char*>(dictionary_memory_) + + dictionary_memory_size_ + - static_cast<const char*>(data_ptr)) + : (static_cast<const std::uint32_t*>(dictionary_memory_)[code + 3] - value_offset); + return std::make_pair(data_ptr, data_size); + } + template <typename CodeType> inline TypedValue fixedLengthGetTypedValueHelper(const CodeType code) const { if (code == getNullCode()) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 8bf1ab1..6b872c0 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -260,6 +260,7 @@ target_link_libraries(quickstep_queryexecution_Worker quickstep_threading_Thread quickstep_threading_ThreadIDBasedMap quickstep_threading_ThreadUtil + quickstep_utility_EventProfiler quickstep_utility_Macros tmb) target_link_libraries(quickstep_queryexecution_WorkerDirectory http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp index 7019b6a..fd0ed08 100644 --- a/query_execution/QueryContext.cpp +++ b/query_execution/QueryContext.cpp @@ -61,15 +61,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto, << "Attempted to create QueryContext from an invalid proto description:\n" << proto.DebugString(); + for (int i = 0; i < proto.bloom_filters_size(); ++i) { + bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i))); + } + for (int i = 0; i < proto.aggregation_states_size(); ++i) { aggregation_states_.emplace_back( AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i), database, - storage_manager)); - } - - for (int i = 0; i < proto.bloom_filters_size(); ++i) { - bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i))); + storage_manager, + bloom_filters_)); } for (int i = 0; i < proto.generator_functions_size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_execution/Worker.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp index 6ba27f1..f94089f 100644 --- a/query_execution/Worker.cpp +++ b/query_execution/Worker.cpp @@ -29,6 +29,7 @@ #include "relational_operators/WorkOrder.hpp" #include "threading/ThreadIDBasedMap.hpp" #include "threading/ThreadUtil.hpp" +#include "utility/EventProfiler.hpp" #include "glog/logging.h" @@ -116,8 +117,12 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message, const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID(); // Start measuring the execution time. + auto *container = relop_profiler.getContainer(); + auto *line = container->getEventLine(worker_message.getRelationalOpIndex()); start = std::chrono::steady_clock::now(); + line->emplace_back(); worker_message.getWorkOrder()->execute(); + line->back().endEvent(); end = std::chrono::steady_clock::now(); delete worker_message.getWorkOrder(); const uint64_t execution_time_microseconds = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_execution/tests/QueryManagerSingleNode_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp index 39ca58c..7c96e7f 100644 --- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp +++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp @@ -104,6 +104,10 @@ class MockOperator: public RelationalOperator { num_calls_donefeedingblocks_(0) { } + std::string getName() const override { + return "MockOperator"; + } + #define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": " // The methods below are used to check whether QueryManager calls the Relational http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index a56b714..e20ca7d 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -79,6 +79,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_QueryPlan quickstep_queryoptimizer_costmodel_CostModel quickstep_queryoptimizer_costmodel_SimpleCostModel + quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel quickstep_queryoptimizer_expressions_AggregateFunction quickstep_queryoptimizer_expressions_Alias quickstep_queryoptimizer_expressions_AttributeReference @@ -197,6 +198,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator quickstep_queryoptimizer_LogicalToPhysicalMapper quickstep_queryoptimizer_logical_Logical quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_rules_AttachBloomFilters quickstep_queryoptimizer_rules_PruneColumns quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization quickstep_queryoptimizer_strategy_Aggregate http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index ce21ade..b8b4c58 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -57,6 +57,7 @@ #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/cost_model/SimpleCostModel.hpp" +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/AggregateFunction.hpp" #include "query_optimizer/expressions/Alias.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" @@ -165,6 +166,8 @@ void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) { cost_model_.reset( new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans())); + star_schema_cost_model_.reset( + new cost::StarSchemaSimpleCostModel(top_level_physical_plan_->shared_subplans())); const CatalogRelation *result_relation = nullptr; @@ -598,8 +601,10 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { std::vector<attribute_id> probe_attribute_ids; std::vector<attribute_id> build_attribute_ids; - std::vector<attribute_id> probe_original_attribute_ids; - std::vector<attribute_id> build_original_attribute_ids; + const P::BloomFilterConfig &bloom_filter_config = + physical_plan->bloom_filter_config(); + std::vector<attribute_id> probe_side_bloom_filter_attribute_ids; + std::vector<attribute_id> build_side_bloom_filter_attribute_ids; const CatalogRelation *referenced_stored_probe_relation = nullptr; const CatalogRelation *referenced_stored_build_relation = nullptr; @@ -612,18 +617,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { const std::vector<E::AttributeReferencePtr> &left_join_attributes = physical_plan->left_join_attributes(); for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) { - // Try to determine the original stored relation referenced in the Hash Join. - referenced_stored_probe_relation = - optimizer_context_->catalog_database()->getRelationByName(left_join_attribute->relation_name()); - if (referenced_stored_probe_relation == nullptr) { - // Hash Join optimizations are not possible, if the referenced relation cannot be determined. - skip_hash_join_optimization = true; - } else { - const attribute_id probe_operator_attribute_id = - referenced_stored_probe_relation->getAttributeByName(left_join_attribute->attribute_name())->getID(); - probe_original_attribute_ids.emplace_back(probe_operator_attribute_id); - } - const CatalogAttribute *probe_catalog_attribute = attribute_substitution_map_[left_join_attribute->id()]; probe_attribute_ids.emplace_back(probe_catalog_attribute->getID()); @@ -636,18 +629,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { const std::vector<E::AttributeReferencePtr> &right_join_attributes = physical_plan->right_join_attributes(); for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) { - // Try to determine the original stored relation referenced in the Hash Join. - referenced_stored_build_relation = - optimizer_context_->catalog_database()->getRelationByName(right_join_attribute->relation_name()); - if (referenced_stored_build_relation == nullptr) { - // Hash Join optimizations are not possible, if the referenced relation cannot be determined. - skip_hash_join_optimization = true; - } else { - const attribute_id build_operator_attribute_id = - referenced_stored_build_relation->getAttributeByName(right_join_attribute->attribute_name())->getID(); - build_original_attribute_ids.emplace_back(build_operator_attribute_id); - } - const CatalogAttribute *build_catalog_attribute = attribute_substitution_map_[right_join_attribute->id()]; build_attribute_ids.emplace_back(build_catalog_attribute->getID()); @@ -657,6 +638,20 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { } } + for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) { + const CatalogAttribute *probe_bf_catalog_attribute + = attribute_substitution_map_[bf.attribute->id()]; + probe_side_bloom_filter_attribute_ids.emplace_back( + probe_bf_catalog_attribute->getID()); + } + + for (const auto &bf : bloom_filter_config.build_side_bloom_filters) { + const CatalogAttribute *build_bf_catalog_attribute + = attribute_substitution_map_[bf.attribute->id()]; + build_side_bloom_filter_attribute_ids.emplace_back( + build_bf_catalog_attribute->getID()); + } + // Remember key types for call to SimplifyHashTableImplTypeProto() below. std::vector<const Type*> key_types; for (std::vector<E::AttributeReferencePtr>::size_type attr_idx = 0; @@ -671,22 +666,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { key_types.push_back(&left_attribute_type); } - std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical); std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical); - // For inner join, we may swap the probe table and the build table. - if (physical_plan->join_type() == P::HashJoin::JoinType::kInnerJoin) { - // Choose the smaller table as the inner build table, - // and the other one as the outer probe table. - if (probe_cardinality < build_cardinality) { - // Switch the probe and build physical nodes. - std::swap(probe_physical, build_physical); - std::swap(probe_cardinality, build_cardinality); - std::swap(probe_attribute_ids, build_attribute_ids); - std::swap(any_probe_attributes_nullable, any_build_attributes_nullable); - std::swap(probe_original_attribute_ids, build_original_attribute_ids); - std::swap(referenced_stored_probe_relation, referenced_stored_build_relation); - } - } // Convert the residual predicate proto. QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId; @@ -848,9 +828,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { join_operator_index, referenced_stored_build_relation, referenced_stored_probe_relation, - std::move(build_original_attribute_ids), - std::move(probe_original_attribute_ids), - join_hash_table_index); + bloom_filter_config, + std::move(build_side_bloom_filter_attribute_ids), + std::move(probe_side_bloom_filter_attribute_ids), + join_hash_table_index, + star_schema_cost_model_->estimateCardinality(build_physical)); } } @@ -1364,6 +1346,16 @@ void ExecutionGenerator::convertAggregate( findRelationInfoOutputByPhysical(physical_plan->input()); aggr_state_proto->set_relation_id(input_relation_info->relation->getID()); + const P::BloomFilterConfig &bloom_filter_config = + physical_plan->bloom_filter_config(); + std::vector<attribute_id> bloom_filter_attribute_ids; + + for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) { + const CatalogAttribute *bf_catalog_attribute + = attribute_substitution_map_[bf.attribute->id()]; + bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID()); + } + std::vector<const Type*> group_by_types; for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { unique_ptr<const Scalar> execution_group_by_expression; @@ -1478,6 +1470,13 @@ void ExecutionGenerator::convertAggregate( std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, output_relation); + + if (FLAGS_optimize_joins) { + execution_heuristics_->addAggregateInfo(aggregation_operator_index, + bloom_filter_config, + std::move(bloom_filter_attribute_ids), + aggr_state_index); + } } void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index 9186707..14939ff 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -37,7 +37,6 @@ #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/cost_model/CostModel.hpp" -#include "query_optimizer/cost_model/SimpleCostModel.hpp" #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" #include "query_optimizer/expressions/Predicate.hpp" @@ -423,6 +422,7 @@ class ExecutionGenerator { * @brief The cost model to use for creating the execution plan. */ std::unique_ptr<cost::CostModel> cost_model_; + std::unique_ptr<cost::CostModel> star_schema_cost_model_; physical::TopLevelPlanPtr top_level_physical_plan_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/ExecutionHeuristics.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp index fc31c53..7d12745 100644 --- a/query_optimizer/ExecutionHeuristics.cpp +++ b/query_optimizer/ExecutionHeuristics.cpp @@ -25,6 +25,8 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.pb.h" #include "query_optimizer/QueryPlan.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/HashJoin.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -32,95 +34,106 @@ namespace quickstep { namespace optimizer { +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +static const std::size_t kNumBitsPerByte = 8; +DEFINE_double(bloom_num_bits_per_tuple, kNumBitsPerByte, + "Number of bits per tuple used to size the Bloom filter."); + +DEFINE_int32(bloom_num_hash_fns, 1, + "Number of hash functions used in the Bloom filter."); + void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan, serialization::QueryContext *query_context_proto) { - // Currently this only optimizes left deep joins using bloom filters. - // It uses a simple algorithm to discover the left deep joins. - // It starts with the first hash join in the plan and keeps on iterating - // over the next hash joins, till a probe on a different relation id is found. - // The set of hash joins found in this way forms a chain and can be recognized - // as a left deep join. It becomes a candidate for optimization. - - // The optimization is done by modifying each of the build operators in the chain - // to generate a bloom filter on the build key during their hash table creation. - // The leaf-level probe operator is then modified to query all the bloom - // filters generated from all the build operators in the chain. These - // bloom filters are queried to test the membership of the probe key - // just prior to probing the hash table. - - QueryPlan::DAGNodeIndex origin_node = 0; - while (origin_node < hash_joins_.size() - 1) { - std::vector<std::size_t> chained_nodes; - chained_nodes.push_back(origin_node); - for (std::size_t i = origin_node + 1; i < hash_joins_.size(); ++i) { - const relation_id checked_relation_id = hash_joins_[origin_node].referenced_stored_probe_relation_->getID(); - const relation_id expected_relation_id = hash_joins_[i].referenced_stored_probe_relation_->getID(); - if (checked_relation_id == expected_relation_id) { - chained_nodes.push_back(i); - } else { - break; - } + std::map<std::pair<E::ExprId, P::PhysicalPtr>, + std::pair<QueryContext::bloom_filter_id, QueryPlan::DAGNodeIndex>> bloom_filter_map; + for (const auto &info : hash_joins_) { + auto *hash_table_proto = + query_context_proto->mutable_join_hash_tables(info.join_hash_table_id_); + const auto &bloom_filter_config = info.bloom_filter_config_; + + for (std::size_t i = 0; i < info.build_side_bloom_filter_ids_.size(); ++i) { + const QueryContext::bloom_filter_id bloom_filter_id = query_context_proto->bloom_filters_size(); + serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters(); + setBloomFilterProperties(bloom_filter_proto, info.estimated_build_relation_cardinality_); + + const auto &build_side_bf = + bloom_filter_config.build_side_bloom_filters[i]; + bloom_filter_map.emplace( + std::make_pair(build_side_bf.attribute->id(), + bloom_filter_config.builder), + std::make_pair(bloom_filter_id, info.build_operator_index_)); + + auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters(); + build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id); + build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]); + std::cerr << "Build " << build_side_bf.attribute->toString() + << " @" << bloom_filter_config.builder << "\n"; } + } - // Only chains of length greater than one are suitable candidates for semi-join optimization. - if (chained_nodes.size() > 1) { - std::unordered_map<QueryContext::bloom_filter_id, std::vector<attribute_id>> probe_bloom_filter_info; - for (const std::size_t node : chained_nodes) { - // Provision for a new bloom filter to be used by the build operator. - const QueryContext::bloom_filter_id bloom_filter_id = query_context_proto->bloom_filters_size(); - serialization::BloomFilter *bloom_filter_proto = query_context_proto->add_bloom_filters(); - - // Modify the bloom filter properties based on the statistics of the relation. - setBloomFilterProperties(bloom_filter_proto, hash_joins_[node].referenced_stored_build_relation_); - - // Add build-side bloom filter information to the corresponding hash table proto. - query_context_proto->mutable_join_hash_tables(hash_joins_[node].join_hash_table_id_) - ->add_build_side_bloom_filter_id(bloom_filter_id); - - probe_bloom_filter_info.insert(std::make_pair(bloom_filter_id, hash_joins_[node].probe_attributes_)); - } - - // Add probe-side bloom filter information to the corresponding hash table proto for each build-side bloom filter. - for (const std::pair<QueryContext::bloom_filter_id, std::vector<attribute_id>> - &bloom_filter_info : probe_bloom_filter_info) { - auto *probe_side_bloom_filter = - query_context_proto->mutable_join_hash_tables(hash_joins_[origin_node].join_hash_table_id_) - ->add_probe_side_bloom_filters(); - probe_side_bloom_filter->set_probe_side_bloom_filter_id(bloom_filter_info.first); - for (const attribute_id &probe_attribute_id : bloom_filter_info.second) { - probe_side_bloom_filter->add_probe_side_attr_ids(probe_attribute_id); - } - } - - // Add node dependencies from chained build nodes to origin node probe. - for (std::size_t i = 1; i < chained_nodes.size(); ++i) { // Note: It starts from index 1. - query_plan->addDirectDependency(hash_joins_[origin_node].join_operator_index_, - hash_joins_[origin_node + i].build_operator_index_, - true /* is_pipeline_breaker */); - } + for (const auto &info : hash_joins_) { + auto *hash_table_proto = + query_context_proto->mutable_join_hash_tables(info.join_hash_table_id_); + const auto &bloom_filter_config = info.bloom_filter_config_; + + for (std::size_t i = 0; i < info.probe_side_bloom_filter_ids_.size(); ++i) { + auto *probe_side_bloom_filter = hash_table_proto->add_probe_side_bloom_filters(); + const auto &probe_side_bf = + bloom_filter_config.probe_side_bloom_filters[i]; + std::cerr << "HashJoin probe " << probe_side_bf.attribute->toString() + << " @" << probe_side_bf.builder << "\n"; + + const auto &build_side_info = + bloom_filter_map.at( + std::make_pair(probe_side_bf.source_attribute->id(), + probe_side_bf.builder)); + probe_side_bloom_filter->set_bloom_filter_id(build_side_info.first); + probe_side_bloom_filter->set_attr_id(info.probe_side_bloom_filter_ids_[i]); +// std::cerr << "HashJoin probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n"; + + query_plan->addDirectDependency(info.join_operator_index_, + build_side_info.second, + true /* is_pipeline_breaker */); } + } - // Update the origin node. - origin_node = chained_nodes.back() + 1; + for (const auto &info : aggregates_) { + auto *aggregate_proto = + query_context_proto->mutable_aggregation_states(info.aggregate_state_id_); + const auto &bloom_filter_config = info.bloom_filter_config_; + + for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) { + auto *bloom_filter = aggregate_proto->add_bloom_filters(); + const auto &bf = + bloom_filter_config.probe_side_bloom_filters[i]; + std::cerr << "Aggregate probe " << bf.attribute->toString() + << " @" << bf.builder << "\n"; + + const auto &build_side_info = + bloom_filter_map.at( + std::make_pair(bf.source_attribute->id(), + bf.builder)); + bloom_filter->set_bloom_filter_id(build_side_info.first); + bloom_filter->set_attr_id(info.bloom_filter_ids_[i]); +// std::cerr << "Aggregate probe attr_id = " +// << info.bloom_filter_ids_[i] << "\n"; + + query_plan->addDirectDependency(info.aggregate_operator_index_, + build_side_info.second, + true /* is_pipeline_breaker */); + } } } void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto, - const CatalogRelation *relation) { - const std::size_t cardinality = relation->estimateTupleCardinality(); - if (cardinality < kOneThousand) { - bloom_filter_proto->set_bloom_filter_size(kOneThousand / kCompressionFactor); - bloom_filter_proto->set_number_of_hashes(kVeryLowSparsityHash); - } else if (cardinality < kTenThousand) { - bloom_filter_proto->set_bloom_filter_size(kTenThousand / kCompressionFactor); - bloom_filter_proto->set_number_of_hashes(kLowSparsityHash); - } else if (cardinality < kHundredThousand) { - bloom_filter_proto->set_bloom_filter_size(kHundredThousand / kCompressionFactor); - bloom_filter_proto->set_number_of_hashes(kMediumSparsityHash); - } else { - bloom_filter_proto->set_bloom_filter_size(kMillion / kCompressionFactor); - bloom_filter_proto->set_number_of_hashes(kHighSparsityHash); - } + const std::size_t cardinality) { + bloom_filter_proto->set_bloom_filter_size( + BloomFilter::getNearestAllowedSize( + (FLAGS_bloom_num_bits_per_tuple * cardinality) / kNumBitsPerByte)); +// std::cerr << "bf size = " << bloom_filter_proto->bloom_filter_size() << "\n"; + bloom_filter_proto->set_number_of_hashes(FLAGS_bloom_num_hash_fns); } } // namespace optimizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/ExecutionHeuristics.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp index 92a7fe8..0755124 100644 --- a/query_optimizer/ExecutionHeuristics.hpp +++ b/query_optimizer/ExecutionHeuristics.hpp @@ -25,6 +25,7 @@ #include "query_execution/QueryContext.hpp" #include "query_execution/QueryContext.pb.h" #include "query_optimizer/QueryPlan.hpp" +#include "query_optimizer/physical/HashJoin.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -49,7 +50,7 @@ class ExecutionHeuristics { static const std::size_t kHundredThousand = 100000; static const std::size_t kMillion = 1000000; - static const std::size_t kCompressionFactor = 10; + static const std::size_t kCompressionFactor = 1; static const std::size_t kVeryLowSparsityHash = 1; static const std::size_t kLowSparsityHash = 2; @@ -65,25 +66,48 @@ class ExecutionHeuristics { const QueryPlan::DAGNodeIndex join_operator_index, const CatalogRelation *referenced_stored_build_relation, const CatalogRelation *referenced_stored_probe_relation, - std::vector<attribute_id> &&build_attributes, - std::vector<attribute_id> &&probe_attributes, - const QueryContext::join_hash_table_id join_hash_table_id) + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&build_side_bloom_filter_ids, + std::vector<attribute_id> &&probe_side_bloom_filter_ids, + const QueryContext::join_hash_table_id join_hash_table_id, + const std::size_t estimated_build_relation_cardinality) : build_operator_index_(build_operator_index), join_operator_index_(join_operator_index), referenced_stored_build_relation_(referenced_stored_build_relation), referenced_stored_probe_relation_(referenced_stored_probe_relation), - build_attributes_(std::move(build_attributes)), - probe_attributes_(std::move(probe_attributes)), - join_hash_table_id_(join_hash_table_id) { + bloom_filter_config_(bloom_filter_config), + build_side_bloom_filter_ids_(std::move(build_side_bloom_filter_ids)), + probe_side_bloom_filter_ids_(std::move(probe_side_bloom_filter_ids)), + join_hash_table_id_(join_hash_table_id), + estimated_build_relation_cardinality_(estimated_build_relation_cardinality) { } const QueryPlan::DAGNodeIndex build_operator_index_; const QueryPlan::DAGNodeIndex join_operator_index_; const CatalogRelation *referenced_stored_build_relation_; const CatalogRelation *referenced_stored_probe_relation_; - const std::vector<attribute_id> build_attributes_; - const std::vector<attribute_id> probe_attributes_; + const physical::BloomFilterConfig &bloom_filter_config_; + const std::vector<attribute_id> build_side_bloom_filter_ids_; + const std::vector<attribute_id> probe_side_bloom_filter_ids_; const QueryContext::join_hash_table_id join_hash_table_id_; + const std::size_t estimated_build_relation_cardinality_; + }; + + struct AggregateInfo { + AggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index, + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&bloom_filter_ids, + const QueryContext::aggregation_state_id aggregate_state_id) + : aggregate_operator_index_(aggregate_operator_index), + bloom_filter_config_(bloom_filter_config), + bloom_filter_ids_(bloom_filter_ids), + aggregate_state_id_(aggregate_state_id) { + } + + const QueryPlan::DAGNodeIndex aggregate_operator_index_; + const physical::BloomFilterConfig &bloom_filter_config_; + const std::vector<attribute_id> bloom_filter_ids_; + const QueryContext::aggregation_state_id aggregate_state_id_; }; @@ -109,16 +133,30 @@ class ExecutionHeuristics { const QueryPlan::DAGNodeIndex join_operator_index, const CatalogRelation *referenced_stored_build_relation, const CatalogRelation *referenced_stored_probe_relation, - std::vector<attribute_id> &&build_attributes, - std::vector<attribute_id> &&probe_attributes, - const QueryContext::join_hash_table_id join_hash_table_id) { - hash_joins_.push_back(HashJoinInfo(build_operator_index, - join_operator_index, - referenced_stored_build_relation, - referenced_stored_probe_relation, - std::move(build_attributes), - std::move(probe_attributes), - join_hash_table_id)); + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&build_side_bloom_filter_ids, + std::vector<attribute_id> &&probe_side_bloom_filter_ids, + const QueryContext::join_hash_table_id join_hash_table_id, + const std::size_t estimated_build_relation_cardinality) { + hash_joins_.emplace_back(build_operator_index, + join_operator_index, + referenced_stored_build_relation, + referenced_stored_probe_relation, + bloom_filter_config, + std::move(build_side_bloom_filter_ids), + std::move(probe_side_bloom_filter_ids), + join_hash_table_id, + estimated_build_relation_cardinality); + } + + inline void addAggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index, + const physical::BloomFilterConfig &bloom_filter_config, + std::vector<attribute_id> &&bloom_filter_ids, + const QueryContext::aggregation_state_id aggregate_state_id) { + aggregates_.emplace_back(aggregate_operator_index, + bloom_filter_config, + std::move(bloom_filter_ids), + aggregate_state_id); } /** @@ -139,10 +177,11 @@ class ExecutionHeuristics { * @param relation The catalog relation on which bloom filter is being built. **/ void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto, - const CatalogRelation *relation); + const std::size_t cardinality); private: std::vector<HashJoinInfo> hash_joins_; + std::vector<AggregateInfo> aggregates_; DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/PhysicalGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp index 75a7bc9..f73a546 100644 --- a/query_optimizer/PhysicalGenerator.cpp +++ b/query_optimizer/PhysicalGenerator.cpp @@ -26,6 +26,7 @@ #include "query_optimizer/Validator.hpp" #include "query_optimizer/logical/Logical.hpp" #include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/rules/AttachBloomFilters.hpp" #include "query_optimizer/rules/PruneColumns.hpp" #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp" #include "query_optimizer/strategy/Aggregate.hpp" @@ -95,9 +96,11 @@ P::PhysicalPtr PhysicalGenerator::generateInitialPlan( P::PhysicalPtr PhysicalGenerator::optimizePlan() { std::vector<std::unique_ptr<Rule<P::Physical>>> rules; if (FLAGS_reorder_hash_joins) { + rules.emplace_back(new PruneColumns()); rules.emplace_back(new StarSchemaHashJoinOrderOptimization()); } rules.emplace_back(new PruneColumns()); + rules.emplace_back(new AttachBloomFilters()); for (std::unique_ptr<Rule<P::Physical>> &rule : rules) { physical_plan_ = rule->apply(physical_plan_); @@ -108,7 +111,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() { DVLOG(4) << "Optimized physical plan:\n" << physical_plan_->toString(); if (FLAGS_visualize_plan) { - quickstep::PlanVisualizer plan_visualizer; + quickstep::PlanVisualizer plan_visualizer; std::cerr << "\n" << plan_visualizer.visualize(physical_plan_) << "\n"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/cost_model/SimpleCostModel.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp index e5222ff..6794f21 100644 --- a/query_optimizer/cost_model/SimpleCostModel.cpp +++ b/query_optimizer/cost_model/SimpleCostModel.cpp @@ -88,7 +88,7 @@ std::size_t SimpleCostModel::estimateCardinalityForTopLevelPlan( std::size_t SimpleCostModel::estimateCardinalityForTableReference( const P::TableReferencePtr &physical_plan) { - return physical_plan->relation()->estimateTupleCardinality(); + return physical_plan->relation()->getStatistics().getNumTuples(); } std::size_t SimpleCostModel::estimateCardinalityForSelection( @@ -119,7 +119,7 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate( return 1; } return std::max(static_cast<std::size_t>(1), - estimateCardinality(physical_plan->input()) / 10); + estimateCardinality(physical_plan->input())); } std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp index badfeb1..ea21a2e 100644 --- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp +++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp @@ -121,12 +121,26 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForTableGenerator( std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin( const P::HashJoinPtr &physical_plan) { - std::size_t left_cardinality = estimateCardinality(physical_plan->left()); - std::size_t right_cardinality = estimateCardinality(physical_plan->right()); - double left_selectivity = estimateSelectivity(physical_plan->left()); - double right_selectivity = estimateSelectivity(physical_plan->right()); - return std::max(static_cast<std::size_t>(left_cardinality * right_selectivity) + 1, - static_cast<std::size_t>(right_cardinality * left_selectivity) + 1); + const P::PhysicalPtr &left_child = physical_plan->left(); + const P::PhysicalPtr &right_child = physical_plan->right(); + + std::size_t left_cardinality = estimateCardinality(left_child); + std::size_t right_cardinality = estimateCardinality(right_child); + + std::size_t estimated_cardinality = std::max(left_cardinality, right_cardinality); + if (left_child->impliesUniqueAttributes(physical_plan->left_join_attributes())) { + double left_selectivity = estimateSelectivity(left_child); + estimated_cardinality = + std::min(estimated_cardinality, + static_cast<std::size_t>(right_cardinality * left_selectivity)); + } + if (right_child->impliesUniqueAttributes(physical_plan->right_join_attributes())) { + double right_selectivity = estimateSelectivity(right_child); + estimated_cardinality = + std::min(estimated_cardinality, + static_cast<std::size_t>(left_cardinality * right_selectivity)); + } + return estimated_cardinality; } std::size_t StarSchemaSimpleCostModel::estimateCardinalityForNestedLoopsJoin( @@ -141,7 +155,7 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate( return 1; } return std::max(static_cast<std::size_t>(1), - estimateCardinality(physical_plan->input()) / 10); + estimateCardinality(physical_plan->input()) / 100); } std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate( @@ -159,8 +173,14 @@ double StarSchemaSimpleCostModel::estimateSelectivity( case P::PhysicalType::kHashJoin: { const P::HashJoinPtr &hash_join = std::static_pointer_cast<const P::HashJoin>(physical_plan); - return std::min(estimateSelectivity(hash_join->left()), - estimateSelectivity(hash_join->right())); + double left_selectivity = estimateSelectivity(hash_join->left()); + double right_selectivity = estimateSelectivity(hash_join->right()); + double min_sel = std::min(left_selectivity, right_selectivity); + double max_sel = std::max(left_selectivity, right_selectivity); + if (max_sel < 1) { + min_sel *= std::max(max_sel, 0.9); + } + return min_sel; } case P::PhysicalType::kNestedLoopsJoin: { const P::NestedLoopsJoinPtr &nested_loop_join = @@ -213,7 +233,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate( case E::ExpressionType::kComparisonExpression: { // Case 1 - Number of distinct values statistics available // Case 1.1 - Equality comparison: 1.0 / num_distinct_values - // Case 1.2 - Otherwise: 5.0 / num_distinct_values + // Case 1.2 - Otherwise: 0.5 // Case 2 - Number of distinct values statistics not available // Case 2.1 - Equality comparison: 0.1 // Case 2.2 - Otherwise: 0.5 @@ -229,7 +249,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate( double unit_selectivity = 1.0 / it->second; return comparison_expression->isEqualityComparisonPredicate() ? unit_selectivity - : std::min(0.5, unit_selectivity * 5.0); + : 0.5; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/expressions/ExpressionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/expressions/ExpressionUtil.hpp b/query_optimizer/expressions/ExpressionUtil.hpp index 4c35719..5e9d29d 100644 --- a/query_optimizer/expressions/ExpressionUtil.hpp +++ b/query_optimizer/expressions/ExpressionUtil.hpp @@ -103,12 +103,12 @@ bool ContainsExpression( * contain the other operand). * @return True if \p left is a subset of \p right. */ -template <class NamedExpressionType> +template <class NamedExpressionType1, class NamedExpressionType2> bool SubsetOfExpressions( - const std::vector<std::shared_ptr<const NamedExpressionType>> &left, - const std::vector<std::shared_ptr<const NamedExpressionType>> &right) { + const std::vector<std::shared_ptr<const NamedExpressionType1>> &left, + const std::vector<std::shared_ptr<const NamedExpressionType2>> &right) { UnorderedNamedExpressionSet supset(right.begin(), right.end()); - for (const std::shared_ptr<const NamedExpressionType> &expr : left) { + for (const std::shared_ptr<const NamedExpressionType1> &expr : left) { if (supset.find(expr) == supset.end()) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/Aggregate.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Aggregate.cpp b/query_optimizer/physical/Aggregate.cpp index c582bba..969daa7 100644 --- a/query_optimizer/physical/Aggregate.cpp +++ b/query_optimizer/physical/Aggregate.cpp @@ -87,6 +87,11 @@ std::vector<E::AttributeReferencePtr> Aggregate::getReferencedAttributes() return referenced_attributes; } +bool Aggregate::impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const { + return E::SubsetOfExpressions(grouping_expressions_, attributes); +} + void Aggregate::getFieldStringItems( std::vector<std::string> *inline_field_names, std::vector<std::string> *inline_field_values, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/Aggregate.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Aggregate.hpp b/query_optimizer/physical/Aggregate.hpp index 2c2aee7..b40997c 100644 --- a/query_optimizer/physical/Aggregate.hpp +++ b/query_optimizer/physical/Aggregate.hpp @@ -98,6 +98,13 @@ class Aggregate : public Physical { return false; } + bool impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const override; + + const BloomFilterConfig &bloom_filter_config() const { + return bloom_filter_config_; + } + /** * @brief Creates an Aggregate physical node. * @@ -111,9 +118,14 @@ class Aggregate : public Physical { PhysicalPtr input, const std::vector<expressions::NamedExpressionPtr> &grouping_expressions, const std::vector<expressions::AliasPtr> &aggregate_expressions, - const expressions::PredicatePtr &filter_predicate) { + const expressions::PredicatePtr &filter_predicate, + const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) { return AggregatePtr( - new Aggregate(input, grouping_expressions, aggregate_expressions, filter_predicate)); + new Aggregate(input, + grouping_expressions, + aggregate_expressions, + filter_predicate, + bloom_filter_config)); } protected: @@ -130,11 +142,13 @@ class Aggregate : public Physical { PhysicalPtr input, const std::vector<expressions::NamedExpressionPtr> &grouping_expressions, const std::vector<expressions::AliasPtr> &aggregate_expressions, - const expressions::PredicatePtr &filter_predicate) + const expressions::PredicatePtr &filter_predicate, + const BloomFilterConfig &bloom_filter_config) : input_(input), grouping_expressions_(grouping_expressions), aggregate_expressions_(aggregate_expressions), - filter_predicate_(filter_predicate) { + filter_predicate_(filter_predicate), + bloom_filter_config_(bloom_filter_config) { addChild(input_); } @@ -142,6 +156,7 @@ class Aggregate : public Physical { std::vector<expressions::NamedExpressionPtr> grouping_expressions_; std::vector<expressions::AliasPtr> aggregate_expressions_; expressions::PredicatePtr filter_predicate_; + BloomFilterConfig bloom_filter_config_; DISALLOW_COPY_AND_ASSIGN(Aggregate); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/HashJoin.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp index 71c3692..f0e72e8 100644 --- a/query_optimizer/physical/HashJoin.cpp +++ b/query_optimizer/physical/HashJoin.cpp @@ -85,6 +85,15 @@ bool HashJoin::maybeCopyWithPrunedExpressions( return false; } +bool HashJoin::impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const { + return (left()->impliesUniqueAttributes(left_join_attributes_) + && right()->impliesUniqueAttributes(attributes)) + || (right()->impliesUniqueAttributes(right_join_attributes_) + && left()->impliesUniqueAttributes(attributes)); + +} + void HashJoin::getFieldStringItems( std::vector<std::string> *inline_field_names, std::vector<std::string> *inline_field_values, @@ -106,6 +115,24 @@ void HashJoin::getFieldStringItems( container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_)); container_child_field_names->push_back("right_join_attributes"); container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_)); + + if (!bloom_filter_config_.build_side_bloom_filters.empty()) { + container_child_field_names->push_back("build_side_bloom_filters"); + container_child_fields->emplace_back(); + auto &container = container_child_fields->back(); + for (const auto& bf : bloom_filter_config_.build_side_bloom_filters) { + container.emplace_back(bf.attribute); + } + } + + if (!bloom_filter_config_.probe_side_bloom_filters.empty()) { + container_child_field_names->push_back("probe_side_bloom_filters"); + container_child_fields->emplace_back(); + auto &container = container_child_fields->back(); + for (const auto& bf : bloom_filter_config_.probe_side_bloom_filters) { + container.emplace_back(bf.attribute); + } + } } } // namespace physical http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/HashJoin.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp index b904b5f..104cb52 100644 --- a/query_optimizer/physical/HashJoin.hpp +++ b/query_optimizer/physical/HashJoin.hpp @@ -115,7 +115,8 @@ class HashJoin : public BinaryJoin { right_join_attributes_, residual_predicate_, project_expressions(), - join_type_); + join_type_, + bloom_filter_config_); } std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override; @@ -124,6 +125,13 @@ class HashJoin : public BinaryJoin { const expressions::UnorderedNamedExpressionSet &referenced_expressions, PhysicalPtr *output) const override; + bool impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const override; + + const BloomFilterConfig &bloom_filter_config() const { + return bloom_filter_config_; + } + /** * @brief Creates a physical HashJoin. The left/right operand does not correspond to * probe/build operand. @@ -144,7 +152,8 @@ class HashJoin : public BinaryJoin { const std::vector<expressions::AttributeReferencePtr> &right_join_attributes, const expressions::PredicatePtr &residual_predicate, const std::vector<expressions::NamedExpressionPtr> &project_expressions, - const JoinType join_type) { + const JoinType join_type, + const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) { return HashJoinPtr( new HashJoin(left, right, @@ -152,7 +161,8 @@ class HashJoin : public BinaryJoin { right_join_attributes, residual_predicate, project_expressions, - join_type)); + join_type, + bloom_filter_config)); } protected: @@ -172,18 +182,21 @@ class HashJoin : public BinaryJoin { const std::vector<expressions::AttributeReferencePtr> &right_join_attributes, const expressions::PredicatePtr &residual_predicate, const std::vector<expressions::NamedExpressionPtr> &project_expressions, - const JoinType join_type) + const JoinType join_type, + const BloomFilterConfig &bloom_filter_config) : BinaryJoin(left, right, project_expressions), left_join_attributes_(left_join_attributes), right_join_attributes_(right_join_attributes), residual_predicate_(residual_predicate), - join_type_(join_type) { + join_type_(join_type), + bloom_filter_config_(bloom_filter_config) { } std::vector<expressions::AttributeReferencePtr> left_join_attributes_; std::vector<expressions::AttributeReferencePtr> right_join_attributes_; expressions::PredicatePtr residual_predicate_; JoinType join_type_; + BloomFilterConfig bloom_filter_config_; DISALLOW_COPY_AND_ASSIGN(HashJoin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/Physical.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp index 9fdbeb5..389cd05 100644 --- a/query_optimizer/physical/Physical.hpp +++ b/query_optimizer/physical/Physical.hpp @@ -39,6 +39,56 @@ namespace physical { class Physical; typedef std::shared_ptr<const Physical> PhysicalPtr; +struct BloomFilterConfig { + struct BuildSide { + BuildSide(const expressions::AttributeReferencePtr &attribute_in) + : attribute(attribute_in) { + } + expressions::AttributeReferencePtr attribute; + }; + struct ProbeSide { + ProbeSide(const expressions::AttributeReferencePtr &attribute_in, + const expressions::AttributeReferencePtr &source_attribute_in, + const physical::PhysicalPtr &builder_in) + : attribute(attribute_in), + source_attribute(source_attribute_in), + builder(builder_in) { + } + expressions::AttributeReferencePtr attribute; + expressions::AttributeReferencePtr source_attribute; + PhysicalPtr builder; + }; + BloomFilterConfig() {} + BloomFilterConfig(const PhysicalPtr &builder_in) + : builder(builder_in) { + } + BloomFilterConfig(const PhysicalPtr &builder_in, + const std::vector<BuildSide> &build_side_bloom_filters_in, + const std::vector<ProbeSide> &probe_side_bloom_filters_in) + : builder(builder_in), + build_side_bloom_filters(build_side_bloom_filters_in), + probe_side_bloom_filters(probe_side_bloom_filters_in) { + } + void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) { + for (const auto &build_bf : build_side_bloom_filters) { + if (attribute_in == build_bf.attribute) { + return; + } + } + build_side_bloom_filters.emplace_back(attribute_in); + } + void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in, + const expressions::AttributeReferencePtr &source_attribute_in, + const physical::PhysicalPtr &builder_in) { + probe_side_bloom_filters.emplace_back(attribute_in, + source_attribute_in, + builder_in); + } + PhysicalPtr builder; + std::vector<BuildSide> build_side_bloom_filters; + std::vector<ProbeSide> probe_side_bloom_filters; +}; + /** * @brief Base class for physical plan nodes. */ @@ -84,6 +134,11 @@ class Physical : public OptimizerTree<Physical> { const expressions::UnorderedNamedExpressionSet &referenced_expressions, PhysicalPtr *output) const = 0; + virtual bool impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const { + return false; + } + protected: /** * @brief Constructor. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/Selection.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp index 5e1a03f..f4cdd1a 100644 --- a/query_optimizer/physical/Selection.cpp +++ b/query_optimizer/physical/Selection.cpp @@ -80,6 +80,12 @@ bool Selection::maybeCopyWithPrunedExpressions( return false; } +bool Selection::impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const { + return input()->impliesUniqueAttributes(attributes); +} + + void Selection::getFieldStringItems( std::vector<std::string> *inline_field_names, std::vector<std::string> *inline_field_values, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/Selection.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp index d8c1319..68cae65 100644 --- a/query_optimizer/physical/Selection.hpp +++ b/query_optimizer/physical/Selection.hpp @@ -84,6 +84,9 @@ class Selection : public Physical { const expressions::UnorderedNamedExpressionSet &referenced_attributes, PhysicalPtr *output) const override; + bool impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const override; + /** * @brief Creates a Selection. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/TableReference.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/TableReference.cpp b/query_optimizer/physical/TableReference.cpp index 4a66ddf..bc73046 100644 --- a/query_optimizer/physical/TableReference.cpp +++ b/query_optimizer/physical/TableReference.cpp @@ -18,6 +18,7 @@ #include "query_optimizer/physical/TableReference.hpp" #include <string> +#include <set> #include <vector> #include "catalog/CatalogRelation.hpp" @@ -30,6 +31,23 @@ namespace physical { namespace E = ::quickstep::optimizer::expressions; +bool TableReference::impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const { + std::set<E::ExprId> attr_ids; + for (const auto &attr : attributes) { + attr_ids.emplace(attr->id()); + } + + std::set<attribute_id> rel_attr_ids; + for (std::size_t i = 0; i < attribute_list_.size(); ++i) { + if (attr_ids.find(attribute_list_[i]->id()) != attr_ids.end()) { + rel_attr_ids.emplace(i); + } + } + + return relation_->getConstraints().impliesUniqueAttributes(rel_attr_ids); +} + void TableReference::getFieldStringItems( std::vector<std::string> *inline_field_names, std::vector<std::string> *inline_field_values, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/97d8dca8/query_optimizer/physical/TableReference.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/TableReference.hpp b/query_optimizer/physical/TableReference.hpp index bde9b97..bc07043 100644 --- a/query_optimizer/physical/TableReference.hpp +++ b/query_optimizer/physical/TableReference.hpp @@ -88,6 +88,9 @@ class TableReference : public Physical { return false; } + bool impliesUniqueAttributes( + const std::vector<expressions::AttributeReferencePtr> &attributes) const override; + /** * @brief Creates a TableReference. *