Repository: incubator-quickstep Updated Branches: refs/heads/reorder-partitioned-hash-join 6afdbbf76 -> 238b2d66a (forced update)
Added limited optimizer support for Partitioned Hash Joins. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/238b2d66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/238b2d66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/238b2d66 Branch: refs/heads/reorder-partitioned-hash-join Commit: 238b2d66ab503b4991e2f8eb1bf2193b3355633d Parents: c53a4d0 Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Jan 25 01:49:28 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Tue Mar 7 00:16:46 2017 -0800 ---------------------------------------------------------------------- query_execution/QueryContext.cpp | 2 + query_optimizer/ExecutionGenerator.cpp | 368 ++++++++++++++++--- query_optimizer/ExecutionGenerator.hpp | 7 +- .../tests/execution_generator/Partition.test | 71 +++- types/TypedValue.hpp | 19 + 5 files changed, 406 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/238b2d66/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp index 71839a7..3681a3b 100644 --- a/query_execution/QueryContext.cpp +++ b/query_execution/QueryContext.cpp @@ -54,6 +54,8 @@ using std::vector; namespace quickstep { +constexpr QueryContext::insert_destination_id QueryContext::kInvalidInsertDestinationId; + QueryContext::QueryContext(const serialization::QueryContext &proto, const CatalogDatabaseLite &database, StorageManager *storage_manager, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/238b2d66/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 70b69e0..4fbda44 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -138,8 +138,12 @@ #include "gflags/gflags.h" #include "glog/logging.h" +using std::find; +using std::make_unique; using std::move; +using std::size_t; using std::static_pointer_cast; +using std::swap; using std::unique_ptr; using std::unordered_map; using std::vector; @@ -163,6 +167,8 @@ static const volatile bool aggregate_hashtable_type_dummy DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); +DEFINE_uint64(num_repartitions, 4, "Number of repartitions for a hash join."); + namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; namespace S = ::quickstep::serialization; @@ -428,7 +434,8 @@ void ExecutionGenerator::convertTableReference( std::piecewise_construct, std::forward_as_tuple(physical_table_reference), std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex, - catalog_relation)); + catalog_relation, + QueryContext::kInvalidInsertDestinationId)); } void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) { @@ -468,8 +475,9 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) { std::piecewise_construct, std::forward_as_tuple(physical_sample), std::forward_as_tuple(sample_index, - output_relation)); - temporary_relation_info_vec_.emplace_back(sample_index, output_relation); + output_relation, + insert_destination_index)); + temporary_relation_info_vec_.emplace_back(sample_index, output_relation, insert_destination_index); } bool ExecutionGenerator::convertSimpleProjection( @@ -602,8 +610,9 @@ void ExecutionGenerator::convertSelection( std::piecewise_construct, std::forward_as_tuple(physical_selection), std::forward_as_tuple(select_index, - output_relation)); - temporary_relation_info_vec_.emplace_back(select_index, output_relation); + output_relation, + insert_destination_index)); + temporary_relation_info_vec_.emplace_back(select_index, output_relation, insert_destination_index); if (lip_filter_generator_ != nullptr) { lip_filter_generator_->addSelectionInfo(physical_selection, select_index); @@ -679,13 +688,72 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan std::piecewise_construct, std::forward_as_tuple(physical_plan), std::forward_as_tuple(probe_relation_info->producer_operator_index, - probe_relation_info->relation)); + probe_relation_info->relation, + probe_relation_info->output_destination_index)); DCHECK(lip_filter_generator_ != nullptr); lip_filter_generator_->addFilterJoinInfo(physical_plan, build_filter_operator_index); } +namespace { + +bool areSamePartitionSchemeHeaders(const PartitionSchemeHeader &lhs_partition_header, + const CatalogRelationSchema &lhs_scheme, + const PartitionSchemeHeader &rhs_partition_header, + const CatalogRelationSchema &rhs_scheme) { + if (lhs_partition_header.getPartitionType() != rhs_partition_header.getPartitionType()) { + return false; + } + + if (lhs_partition_header.getNumPartitions() != rhs_partition_header.getNumPartitions()) { + return false; + } + + // Check whether the underlying types in CatalogAttribute are the same. + if (!lhs_scheme.getAttributeById(lhs_partition_header.getPartitionAttributeId())->getType().equals( + rhs_scheme.getAttributeById(rhs_partition_header.getPartitionAttributeId())->getType())) { + return false; + } + + switch (lhs_partition_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kHash: + return true; + case PartitionSchemeHeader::PartitionType::kRange: { + const vector<TypedValue> &lhs_ranges = + static_cast<const RangePartitionSchemeHeader&>(lhs_partition_header).getPartitionRangeBoundaries(); + const vector<TypedValue> &rhs_ranges = + static_cast<const RangePartitionSchemeHeader&>(rhs_partition_header).getPartitionRangeBoundaries(); + + return lhs_ranges == rhs_ranges; + } + } + + return false; +} + + +// Note that this method will be deprecated once the partition scheme header +// supports multiple partition attributes. +size_t chooseBestRepartitionAttributeIndex(const CatalogRelationStatistics &stats, + const vector<attribute_id> &join_attributes) { + size_t chose_attr_index = static_cast<size_t>(-1); + size_t chose_attr_num_distinct_values = 0; + + for (std::size_t i = 0; i < join_attributes.size(); ++i) { + const attribute_id attr = join_attributes[i]; + if (stats.hasNumDistinctValues(attr) && + stats.getNumDistinctValues(attr) > chose_attr_num_distinct_values) { + chose_attr_index = i; + chose_attr_num_distinct_values = stats.getNumDistinctValues(attr); + } + } + + return (chose_attr_index != static_cast<size_t>(-1)) ? chose_attr_index : 0; +} + +} // namespace + void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { // HashJoin is converted to three operators: // BuildHash, HashJoin, DestroyHash. The second is the primary operator. @@ -696,13 +764,10 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { std::vector<attribute_id> probe_attribute_ids; std::vector<attribute_id> build_attribute_ids; - std::size_t build_cardinality = - cost_model_for_hash_join_->estimateCardinality(build_physical); - bool any_probe_attributes_nullable = false; bool any_build_attributes_nullable = false; - const std::vector<E::AttributeReferencePtr> &left_join_attributes = + std::vector<E::AttributeReferencePtr> left_join_attributes = physical_plan->left_join_attributes(); for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) { const CatalogAttribute *probe_catalog_attribute @@ -714,7 +779,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { } } - const std::vector<E::AttributeReferencePtr> &right_join_attributes = + std::vector<E::AttributeReferencePtr> right_join_attributes = physical_plan->right_join_attributes(); for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) { const CatalogAttribute *build_catalog_attribute @@ -740,6 +805,202 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { key_types.push_back(&left_attribute_type); } + const CatalogRelationInfo *build_relation_info = + findRelationInfoOutputByPhysical(build_physical); + const CatalogRelationInfo *probe_operator_info = + findRelationInfoOutputByPhysical(probe_physical); + + const CatalogRelation *build_relation = build_relation_info->relation; + const CatalogRelation *probe_relation = probe_operator_info->relation; + + // FIXME(quickstep-team): Add support for self-join. + if (build_relation == probe_relation) { + THROW_SQL_ERROR() << "Self-join is not supported"; + } + + const PartitionScheme *build_partition_scheme = build_relation->getPartitionScheme(); + const PartitionScheme *probe_partition_scheme = probe_relation->getPartitionScheme(); + + bool build_needs_repartition = false; + bool probe_needs_repartition = false; + bool needs_swap = false; + if (build_partition_scheme && probe_partition_scheme) { + const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader(); + const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader(); + + switch (build_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + build_needs_repartition = true; + + switch (probe_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + probe_needs_repartition = true; + break; + case PartitionSchemeHeader::PartitionType::kHash: { + const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId(); + if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) != + probe_attribute_ids.end()) { + needs_swap = true; + } else { + probe_needs_repartition = true; + } + break; + } + } + break; + case PartitionSchemeHeader::PartitionType::kHash: { + const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId(); + if (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) != + build_attribute_ids.end()) { + // BuildRelation has a useful partition. + switch (probe_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + probe_needs_repartition = true; + break; + case PartitionSchemeHeader::PartitionType::kHash: { + if (areSamePartitionSchemeHeaders(build_partition_scheme_header, *build_relation, + probe_partition_scheme_header, *probe_relation)) { + if (cost_model_for_hash_join_->estimateCardinality(build_physical) > + cost_model_for_hash_join_->estimateCardinality(probe_physical)) { + needs_swap = true; + } + } else { + probe_needs_repartition = true; + } + break; + } + } + } else { + build_needs_repartition = true; + + switch (probe_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + probe_needs_repartition = true; + break; + case PartitionSchemeHeader::PartitionType::kHash: { + const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId(); + if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) != + probe_attribute_ids.end()) { + needs_swap = true; + } else { + probe_needs_repartition = true; + } + break; + } + } + } + break; + } + } + } else if (probe_partition_scheme) { + needs_swap = true; + + const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader(); + switch (probe_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + probe_needs_repartition = true; + break; + case PartitionSchemeHeader::PartitionType::kHash: { + const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId(); + + probe_needs_repartition = + (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) == + probe_attribute_ids.end()); + break; + } + } + } else if (build_partition_scheme) { + const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader(); + switch (build_partition_scheme_header.getPartitionType()) { + case PartitionSchemeHeader::PartitionType::kRange: + build_needs_repartition = true; + break; + case PartitionSchemeHeader::PartitionType::kHash: { + const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId(); + build_needs_repartition = + (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) == + build_attribute_ids.end()); + break; + } + } + } + + if (needs_swap) { + swap(probe_physical, build_physical); + swap(probe_attribute_ids, build_attribute_ids); + swap(any_probe_attributes_nullable, any_build_attributes_nullable); + swap(left_join_attributes, right_join_attributes); + swap(probe_operator_info, build_relation_info); + swap(probe_relation, build_relation); + swap(probe_partition_scheme, build_partition_scheme); + swap(probe_needs_repartition, build_needs_repartition); + } + + unique_ptr<PartitionScheme> probe_repartition_scheme; + if (build_needs_repartition) { + const size_t repartition_attr_index = + chooseBestRepartitionAttributeIndex(build_relation->getStatistics(), build_attribute_ids); + auto build_repartition_scheme_header = + make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions, + build_attribute_ids[repartition_attr_index]); + auto build_repartition_scheme = make_unique<PartitionScheme>(build_repartition_scheme_header.release()); + + build_partition_scheme = build_repartition_scheme.get(); + + if (build_relation_info->isStoredRelation()) { + THROW_SQL_ERROR() << "Re-partition for the base build table is not supported"; + } else { + S::InsertDestination *build_insert_destination_proto = + query_context_proto_->mutable_insert_destinations(build_relation_info->output_destination_index); + + build_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); + build_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) + ->MergeFrom(build_repartition_scheme->getProto()); + + CatalogRelation *mutable_build_relation = + catalog_database_->getRelationByIdMutable(build_relation->getID()); + mutable_build_relation->setPartitionScheme(build_repartition_scheme.release()); + } + + if (probe_needs_repartition) { + auto probe_repartition_scheme_header = + make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions, + probe_attribute_ids[repartition_attr_index]); + probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release()); + } + } else if (probe_needs_repartition) { + const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader(); + const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId(); + + size_t repartition_attr_index = 0; + while (build_attribute_ids[repartition_attr_index] != build_partition_attr) { + ++repartition_attr_index; + } + auto probe_repartition_scheme_header = + make_unique<HashPartitionSchemeHeader>(build_partition_scheme_header.getNumPartitions(), + probe_attribute_ids[repartition_attr_index]); + probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release()); + } + + if (probe_needs_repartition) { + probe_partition_scheme = probe_repartition_scheme.get(); + + if (probe_operator_info->isStoredRelation()) { + THROW_SQL_ERROR() << "Re-partition for the base probe table is not supported"; + } else { + S::InsertDestination *probe_insert_destination_proto = + query_context_proto_->mutable_insert_destinations(probe_operator_info->output_destination_index); + + probe_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); + probe_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) + ->MergeFrom(probe_repartition_scheme->getProto()); + + CatalogRelation *mutable_probe_relation = + catalog_database_->getRelationByIdMutable(probe_relation->getID()); + mutable_probe_relation->setPartitionScheme(probe_repartition_scheme.release()); + } + } + // Convert the residual predicate proto. QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId; if (physical_plan->residual_predicate()) { @@ -755,11 +1016,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { convertNamedExpressions(physical_plan->project_expressions(), query_context_proto_->add_scalar_groups()); - const CatalogRelationInfo *build_relation_info = - findRelationInfoOutputByPhysical(build_physical); - const CatalogRelationInfo *probe_operator_info = - findRelationInfoOutputByPhysical(probe_physical); - // Create a vector that indicates whether each project expression is using // attributes from the build relation as input. This information is required // by the current implementation of hash left outer join @@ -772,30 +1028,17 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { build_physical->getOutputAttributes()))); } - const CatalogRelation *build_relation = build_relation_info->relation; - - // FIXME(quickstep-team): Add support for self-join. - if (build_relation == probe_operator_info->relation) { - THROW_SQL_ERROR() << "Self-join is not supported"; - } - // Create join hash table proto. const QueryContext::join_hash_table_id join_hash_table_index = query_context_proto_->join_hash_tables_size(); S::QueryContext::HashTableContext *hash_table_context_proto = query_context_proto_->add_join_hash_tables(); - // No partition. - std::size_t num_partitions = 1; - if (build_relation->hasPartitionScheme() && - build_attribute_ids.size() == 1) { - const PartitionSchemeHeader &partition_scheme_header = - build_relation->getPartitionScheme()->getPartitionSchemeHeader(); - if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeId()) { - // TODO(zuyu): add optimizer support for partitioned hash joins. - hash_table_context_proto->set_num_partitions(num_partitions); - } - } + const std::size_t build_num_partitions = + build_partition_scheme + ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions() + : 1u; + hash_table_context_proto->set_num_partitions(build_num_partitions); S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table(); @@ -808,10 +1051,12 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { key_types)); for (const attribute_id build_attribute : build_attribute_ids) { - hash_table_proto->add_key_types()->CopyFrom( + hash_table_proto->add_key_types()->MergeFrom( build_relation->getAttributeById(build_attribute)->getType().getProto()); } + const std::size_t build_cardinality = + cost_model_for_hash_join_->estimateCardinality(build_physical); hash_table_proto->set_estimated_num_entries(build_cardinality); // Create three operators. @@ -823,7 +1068,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { build_relation_info->isStoredRelation(), build_attribute_ids, any_build_attributes_nullable, - num_partitions, + build_num_partitions, join_hash_table_index)); // Create InsertDestination proto. @@ -862,11 +1107,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { new HashJoinOperator( query_handle_->query_id(), *build_relation, - *probe_operator_info->relation, + *probe_relation, probe_operator_info->isStoredRelation(), probe_attribute_ids, any_probe_attributes_nullable, - num_partitions, + build_num_partitions, *output_relation, insert_destination_index, join_hash_table_index, @@ -878,7 +1123,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { const QueryPlan::DAGNodeIndex destroy_operator_index = execution_plan_->addRelationalOperator(new DestroyHashOperator( - query_handle_->query_id(), num_partitions, join_hash_table_index)); + query_handle_->query_id(), build_num_partitions, join_hash_table_index)); if (!build_relation_info->isStoredRelation()) { execution_plan_->addDirectDependency(build_operator_index, @@ -909,8 +1154,9 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { std::piecewise_construct, std::forward_as_tuple(physical_plan), std::forward_as_tuple(join_operator_index, - output_relation)); - temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation); + output_relation, + insert_destination_index)); + temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index); if (lip_filter_generator_ != nullptr) { lip_filter_generator_->addHashJoinInfo(physical_plan, @@ -986,8 +1232,9 @@ void ExecutionGenerator::convertNestedLoopsJoin( std::piecewise_construct, std::forward_as_tuple(physical_plan), std::forward_as_tuple(join_operator_index, - output_relation)); - temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation); + output_relation, + insert_destination_index)); + temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index); } void ExecutionGenerator::convertCopyFrom( @@ -1612,9 +1859,12 @@ void ExecutionGenerator::convertAggregate( physical_to_output_relation_map_.emplace( std::piecewise_construct, std::forward_as_tuple(physical_plan), - std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); + std::forward_as_tuple(finalize_aggregation_operator_index, + output_relation, + insert_destination_index)); temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, - output_relation); + output_relation, + insert_destination_index); const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = execution_plan_->addRelationalOperator( @@ -1759,9 +2009,12 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( physical_to_output_relation_map_.emplace( std::piecewise_construct, std::forward_as_tuple(physical_plan), - std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); + std::forward_as_tuple(finalize_aggregation_operator_index, + output_relation, + insert_destination_index)); temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, - output_relation); + output_relation, + insert_destination_index); const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = execution_plan_->addRelationalOperator( @@ -1817,7 +2070,8 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { false /* is_pipeline_breaker */); } temporary_relation_info_vec_.emplace_back(run_generator_index, - initial_runs_relation); + initial_runs_relation, + initial_runs_destination_id); initial_runs_destination_proto->set_relational_op_index(run_generator_index); // Create sort configuration for run merging. @@ -1892,12 +2146,14 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { true /* is_pipeline_breaker */); temporary_relation_info_vec_.emplace_back(merge_run_operator_index, - sorted_relation); + sorted_relation, + sorted_output_destination_id); physical_to_output_relation_map_.emplace( std::piecewise_construct, std::forward_as_tuple(physical_sort), std::forward_as_tuple(merge_run_operator_index, - sorted_relation)); + sorted_relation, + sorted_output_destination_id)); } void ExecutionGenerator::convertTableGenerator( @@ -1932,8 +2188,9 @@ void ExecutionGenerator::convertTableGenerator( std::piecewise_construct, std::forward_as_tuple(physical_tablegen), std::forward_as_tuple(tablegen_index, - output_relation)); - temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation); + output_relation, + insert_destination_index)); + temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation, insert_destination_index); } void ExecutionGenerator::convertWindowAggregate( @@ -2036,9 +2293,12 @@ void ExecutionGenerator::convertWindowAggregate( physical_to_output_relation_map_.emplace( std::piecewise_construct, std::forward_as_tuple(physical_plan), - std::forward_as_tuple(window_aggregation_operator_index, output_relation)); + std::forward_as_tuple(window_aggregation_operator_index, + output_relation, + insert_destination_index)); temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index, - output_relation); + output_relation, + insert_destination_index); } } // namespace optimizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/238b2d66/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index f4e614a..44a8a50 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -135,9 +135,11 @@ class ExecutionGenerator { */ struct CatalogRelationInfo { CatalogRelationInfo(const QueryPlan::DAGNodeIndex producer_operator_index_in, - const CatalogRelation *relation_in) + const CatalogRelation *relation_in, + const QueryContext::insert_destination_id output_destination_index_in) : producer_operator_index(producer_operator_index_in), - relation(relation_in) {} + relation(relation_in), + output_destination_index(output_destination_index_in) {} /** * @return True if the relation is a stored relation (i.e. not a temporary relation @@ -149,6 +151,7 @@ class ExecutionGenerator { const QueryPlan::DAGNodeIndex producer_operator_index; const CatalogRelation *relation; + const QueryContext::insert_destination_id output_destination_index; /** * @brief Represents an invalid node index. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/238b2d66/query_optimizer/tests/execution_generator/Partition.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/Partition.test b/query_optimizer/tests/execution_generator/Partition.test index ab05391..668b050 100644 --- a/query_optimizer/tests/execution_generator/Partition.test +++ b/query_optimizer/tests/execution_generator/Partition.test @@ -15,19 +15,35 @@ # specific language governing permissions and limitations # under the License. -CREATE TABLE foo (id INT NULL, - name CHAR(20)) +CREATE TABLE dim_4_hash_partitions (id INT NULL, + char_col CHAR(20)) +PARTITION BY HASH(id) PARTITIONS 4; +CREATE TABLE dim_2_hash_partitions (id INT NULL, + char_col CHAR(20)) +PARTITION BY HASH(id) PARTITIONS 2; +CREATE TABLE fact (id INT NULL, + score DOUBLE NULL) PARTITION BY HASH(id) PARTITIONS 4; -INSERT INTO foo +INSERT INTO dim_4_hash_partitions SELECT int_col, char_col FROM test WHERE int_col > 0 OR int_col < 0; -SELECT * FROM foo; +INSERT INTO dim_2_hash_partitions +SELECT int_col, char_col +FROM test +WHERE int_col > 0 OR int_col < 0; + +INSERT INTO fact +SELECT int_col, double_col +FROM test +WHERE int_col % 2 = 0; + +SELECT * FROM dim_4_hash_partitions; -- +-----------+--------------------+ -|id |name | +|id |char_col | +-----------+--------------------+ | 4| 4 2.000000| | 8| 8 2.828427| @@ -52,3 +68,48 @@ SELECT * FROM foo; | -17| -17 4.123106| | -21| -21 4.582576| +-----------+--------------------+ +== + +# Partitioned Hash Join. +SELECT fact.id, dim_4_hash_partitions.char_col +FROM dim_4_hash_partitions JOIN fact ON dim_4_hash_partitions.id = fact.id; +-- ++-----------+--------------------+ +|id |char_col | ++-----------+--------------------+ +| 4| 4 2.000000| +| 8| 8 2.828427| +| 12| 12 3.464102| +| 16| 16 4.000000| +| 24| 24 4.898979| +| 2| 2 1.414214| +| 6| 6 2.449490| +| 14| 14 3.741657| +| 18| 18 4.242641| +| 22| 22 4.690416| ++-----------+--------------------+ +== + +# Hash Join with two stored relations, one of which is partitioned. +SELECT fact.id, test.char_col +FROM test JOIN fact ON test.int_col = fact.id; +-- +[same as above] +== + +# Hash Join with one stored, partitioned relation, +# and a non-stored, non-partitioned one. +SELECT fact.id, test.char_col +FROM fact JOIN test ON fact.id = test.int_col +WHERE test.int_col % 2 = 0; +-- +[same as above] +== + +# Repartitioned Hash Join. +SELECT fact.id, dim_2_hash_partitions.char_col +FROM dim_2_hash_partitions, fact +WHERE dim_2_hash_partitions.id = fact.id + AND dim_2_hash_partitions.id % 2 = 0; +-- +[same as above] http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/238b2d66/types/TypedValue.hpp ---------------------------------------------------------------------- diff --git a/types/TypedValue.hpp b/types/TypedValue.hpp index 0ba3d53..1b564c5 100644 --- a/types/TypedValue.hpp +++ b/types/TypedValue.hpp @@ -253,6 +253,25 @@ class TypedValue { } /** + * @brief Equal operator. + **/ + bool operator==(const TypedValue &rhs) const { + if (getTypeID() != rhs.getTypeID()) { + return false; + } + + if (isNull() != rhs.isNull()) { + return false; + } + + if (isNull()) { + return true; + } + + return fastEqualCheck(rhs); + } + + /** * @brief Create a new literal TypedValue with pre-allocated out-of-line * data. * @warning The memory at value_ptr must be allocated with malloc() or