http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp index 6bf5719..e398d62 100644 --- a/relational_operators/SortMergeRunOperator.cpp +++ b/relational_operators/SortMergeRunOperator.cpp @@ -23,9 +23,11 @@ #include <vector> #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/SortMergeRunOperator.pb.h" #include "relational_operators/SortMergeRunOperatorHelpers.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "threading/ThreadIDBasedMap.hpp" #include "glog/logging.h" @@ -69,6 +71,72 @@ bool SortMergeRunOperator::getAllWorkOrders( return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus); } +bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + // Input blocks (or runs) are from base relation. Only possible when base + // relation is stored sorted. + if (!started_) { + // Initialize merge tree completely, since all input runs are known. + merge_tree_.initializeTree(input_relation_block_ids_.size()); + started_ = true; + initializeInputRuns(); + } + } else { + // Input blocks (or runs) are pipelined from the sorted run generation + // operator. + if (!started_ && !input_stream_done_) { + // Initialize merge tree for first pipeline mode. + merge_tree_.initializeForPipeline(); + started_ = true; + initializeInputRuns(); + } + } + + // Get merge jobs from merge tree. + std::vector<MergeTree::MergeJob> jobs; + const bool done_generating = merge_tree_.getMergeJobs(&jobs); + + for (std::vector<MergeTree::MergeJob>::size_type job_id = 0; + job_id < jobs.size(); + ++job_id) { + // Add work order for each merge job. + container->addWorkOrderProto(createWorkOrderProto(&jobs[job_id]), op_index_); + } + + return done_generating; +} + +serialization::WorkOrder* SortMergeRunOperator::createWorkOrderProto( + merge_run_operator::MergeTree::MergeJob *job) { + DCHECK(job != nullptr); + DCHECK(!job->runs.empty()); + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::SORT_MERGE_RUN); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::SortMergeRunWorkOrder::operator_index, op_index_); + proto->SetExtension(serialization::SortMergeRunWorkOrder::sort_config_index, sort_config_index_); + + for (const merge_run_operator::Run &run : job->runs) { + serialization::Run *run_proto = proto->AddExtension(serialization::SortMergeRunWorkOrder::runs); + for (const block_id block : run) { + run_proto->add_blocks(block); + } + } + + proto->SetExtension(serialization::SortMergeRunWorkOrder::top_k, top_k_); + proto->SetExtension(serialization::SortMergeRunWorkOrder::merge_level, job->level); + proto->SetExtension(serialization::SortMergeRunWorkOrder::relation_id, + job->level > 0 ? run_relation_.getID() + : input_relation_.getID()); + proto->SetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index, + job->is_final_level ? output_destination_index_ + : run_block_destination_index_); + + return proto; +} + WorkOrder *SortMergeRunOperator::createWorkOrder( merge_run_operator::MergeTree::MergeJob *job, QueryContext *query_context,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp index cfff8b9..177836f 100644 --- a/relational_operators/SortMergeRunOperator.hpp +++ b/relational_operators/SortMergeRunOperator.hpp @@ -44,8 +44,11 @@ namespace quickstep { class CatalogRelationSchema; class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** * @defgroup SortMergeRun Merging Sorted Runs * @ingroup Sort @@ -132,6 +135,8 @@ class SortMergeRunOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { input_relation_block_ids_.push_back(input_block_id); @@ -182,6 +187,13 @@ class SortMergeRunOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus); + /** + * @brief Create Work Order proto. + * + * @param job The merge job. + **/ + serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob *job); + const CatalogRelation &input_relation_; const CatalogRelation &output_relation_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp index 37b8fb8..d7362db 100644 --- a/relational_operators/SortRunGenerationOperator.cpp +++ b/relational_operators/SortRunGenerationOperator.cpp @@ -21,7 +21,9 @@ #include "catalog/CatalogRelation.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageManager.hpp" @@ -80,6 +82,43 @@ bool SortRunGenerationOperator::getAllWorkOrders( } } +bool SortRunGenerationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + // Input blocks are from a base relation. + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + // Input blocks are pipelined. + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* SortRunGenerationOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::SORT_RUN_GENERATION); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index, sort_config_index_); + proto->SetExtension(serialization::SortRunGenerationWorkOrder::relation_id, input_relation_.getID()); + proto->SetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index, + output_destination_index_); + proto->SetExtension(serialization::SortRunGenerationWorkOrder::block_id, block); + + return proto; +} + + void SortRunGenerationWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp index f96e6a6..96a3ce1 100644 --- a/relational_operators/SortRunGenerationOperator.hpp +++ b/relational_operators/SortRunGenerationOperator.hpp @@ -40,8 +40,11 @@ namespace quickstep { class CatalogRelationSchema; class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** * \defgroup Sort Sorting * \ingroup RelationalOperators @@ -112,6 +115,8 @@ class SortRunGenerationOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { DCHECK(input_relation_id == input_relation_.getID()); input_relation_block_ids_.push_back(input_block_id); @@ -133,6 +138,13 @@ class SortRunGenerationOperator : public RelationalOperator { } private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const CatalogRelation &input_relation_; const CatalogRelation &output_relation_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp index a3f9340..d5a08ec 100644 --- a/relational_operators/TableGeneratorOperator.cpp +++ b/relational_operators/TableGeneratorOperator.cpp @@ -1,6 +1,6 @@ /** * Copyright 2016, Quickstep Research Group, Computer Sciences Department, - * University of WisconsinâMadison. + * University of WisconsinâMadison. * Copyright 2016 Pivotal Software, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -20,7 +20,9 @@ #include "expressions/table_generator/GeneratorFunctionHandle.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" @@ -53,6 +55,22 @@ bool TableGeneratorOperator::getAllWorkOrders( return started_; } +bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (!started_) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::TABLE_GENERATOR); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_); + proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_); + + container->addWorkOrderProto(proto, op_index_); + started_ = true; + } + return true; +} + + void TableGeneratorWorkOrder::execute() { ColumnVectorsValueAccessor temp_result; function_handle_.populateColumns(&temp_result); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp index 6a6af4b..1b791a6 100644 --- a/relational_operators/TableGeneratorOperator.hpp +++ b/relational_operators/TableGeneratorOperator.hpp @@ -1,6 +1,6 @@ /** * Copyright 2016, Quickstep Research Group, Computer Sciences Department, - * University of WisconsinâMadison. + * University of WisconsinâMadison. * Copyright 2016 Pivotal Software, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -40,6 +40,7 @@ namespace quickstep { class GeneratorFunctionHandle; class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -81,6 +82,8 @@ class TableGeneratorOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index d2fd0cd..49c9150 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -22,6 +22,7 @@ #include <algorithm> #include <cctype> #include <cstddef> +#include <cstdint> #include <cstdio> #include <cstdlib> #include <memory> @@ -31,21 +32,46 @@ #include "catalog/CatalogAttribute.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" -#include "types/containers/Tuple.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "types/containers/Tuple.hpp" #include "utility/Glob.hpp" +#include "gflags/gflags.h" #include "glog/logging.h" #include "tmb/id_typedefs.h" +using std::size_t; +using std::string; + namespace quickstep { +// Text segment size set to 256KB. +DEFINE_uint64(textscan_text_segment_size, 0x40000, + "Size of text segment in bytes the input text files " + "are split into in the TextScanOperator."); + +// Check if the segment size is positive. +static bool ValidateTextScanTextSegmentSize(const char *flagname, + std::uint64_t text_segment_size) { + if (text_segment_size == 0) { + LOG(ERROR) << "--" << flagname << " must be greater than 0"; + return false; + } + + return true; +} + +static const volatile bool text_scan_text_segment_size_dummy = gflags::RegisterFlagValidator( + &FLAGS_textscan_text_segment_size, &ValidateTextScanTextSegmentSize); + bool TextScanOperator::getAllWorkOrders( WorkOrdersContainer *container, QueryContext *query_context, @@ -56,16 +82,12 @@ bool TextScanOperator::getAllWorkOrders( const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_); - if (files.size() == 0) { - LOG(FATAL) << "No files matched '" << file_pattern_ << "'. Exiting."; - } + CHECK_NE(files.size(), 0u) + << "No files matched '" << file_pattern_ << "'. Exiting."; InsertDestination *output_destination = query_context->getInsertDestination(output_destination_index_); - // Text segment size set to 256KB. - constexpr std::size_t kTextSegmentSize = 0x40000u; - if (blocking_dependencies_met_ && !work_generated_) { for (const std::string &file : files) { // Use standard C libary to retrieve the file size. @@ -75,18 +97,32 @@ bool TextScanOperator::getAllWorkOrders( std::fclose(fp); std::size_t text_offset = 0; - while (text_offset < file_size) { + for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size; + num_full_segments > 0; + --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) { + container->addNormalWorkOrder( + new TextScanWorkOrder(query_id_, + file, + text_offset, + FLAGS_textscan_text_segment_size, + field_terminator_, + process_escape_sequences_, + output_destination), + op_index_); + } + + // Deal with the residual partial segment whose size is less than + // 'FLAGS_textscan_text_segment_size'. + if (text_offset < file_size) { container->addNormalWorkOrder( new TextScanWorkOrder(query_id_, file, text_offset, - std::min(kTextSegmentSize, file_size - text_offset), + file_size - text_offset, field_terminator_, process_escape_sequences_, - output_destination, - storage_manager), + output_destination), op_index_); - text_offset += kTextSegmentSize; } } work_generated_ = true; @@ -94,24 +130,53 @@ bool TextScanOperator::getAllWorkOrders( return work_generated_; } -TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id, - const std::string &filename, - const std::size_t text_offset, - const std::size_t text_segment_size, - const char field_terminator, - const bool process_escape_sequences, - InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), - filename_(filename), - text_offset_(text_offset), - text_segment_size_(text_segment_size), - field_terminator_(field_terminator), - process_escape_sequences_(process_escape_sequences), - output_destination_(output_destination), - storage_manager_(storage_manager) { - DCHECK(output_destination_ != nullptr); - DCHECK(storage_manager_ != nullptr); +bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_); + if (blocking_dependencies_met_ && !work_generated_) { + for (const string &file : files) { + // Use standard C libary to retrieve the file size. + FILE *fp = std::fopen(file.c_str(), "rb"); + std::fseek(fp, 0, SEEK_END); + const std::size_t file_size = std::ftell(fp); + std::fclose(fp); + + size_t text_offset = 0; + for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size; + num_full_segments > 0; + --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) { + container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size), + op_index_); + } + + // Deal with the residual partial segment whose size is less than + // 'FLAGS_textscan_text_segment_size'. + if (text_offset < file_size) { + container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset), + op_index_); + } + } + work_generated_ = true; + } + return work_generated_; +} + +serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename, + const size_t text_offset, + const size_t text_segment_size) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::TEXT_SCAN); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::TextScanWorkOrder::filename, filename); + proto->SetExtension(serialization::TextScanWorkOrder::text_offset, text_offset); + proto->SetExtension(serialization::TextScanWorkOrder::text_segment_size, text_segment_size); + proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_); + proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences, + process_escape_sequences_); + proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index, + output_destination_index_); + + return proto; } void TextScanWorkOrder::execute() { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index d73e7dd..1a62ded 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -33,6 +33,8 @@ #include "types/containers/Tuple.hpp" #include "utility/Macros.hpp" +#include "glog/logging.h" + #include "tmb/id_typedefs.h" namespace tmb { class MessageBus; } @@ -42,8 +44,11 @@ namespace quickstep { class CatalogRelationSchema; class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -135,6 +140,8 @@ class TextScanOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + QueryContext::insert_destination_id getInsertDestinationID() const override { return output_destination_index_; } @@ -144,6 +151,10 @@ class TextScanOperator : public RelationalOperator { } private: + serialization::WorkOrder* createWorkOrderProto(const std::string &filename, + const std::size_t text_offset, + const std::size_t text_segment_size); + const std::string file_pattern_; const char field_terminator_; const bool process_escape_sequences_; @@ -173,7 +184,6 @@ class TextScanWorkOrder : public WorkOrder { * @param process_escape_sequences Whether to decode escape sequences in the * text file. * @param output_destination The InsertDestination to insert tuples. - * @param storage_manager The StorageManager to use. **/ TextScanWorkOrder( const std::size_t query_id, @@ -182,8 +192,14 @@ class TextScanWorkOrder : public WorkOrder { const std::size_t text_segment_size, const char field_terminator, const bool process_escape_sequences, - InsertDestination *output_destination, - StorageManager *storage_manager); + InsertDestination *output_destination) + : WorkOrder(query_id), + filename_(filename), + text_offset_(text_offset), + text_segment_size_(text_segment_size), + field_terminator_(field_terminator), + process_escape_sequences_(process_escape_sequences), + output_destination_(DCHECK_NOTNULL(output_destination)) {} ~TextScanWorkOrder() override {} @@ -233,7 +249,6 @@ class TextScanWorkOrder : public WorkOrder { Tuple parseRow(const char **row_ptr, const CatalogRelationSchema &relation) const; - /** * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as * a char literal. \p *literal_ptr will be modified to the last position @@ -297,7 +312,6 @@ class TextScanWorkOrder : public WorkOrder { const bool process_escape_sequences_; InsertDestination *output_destination_; - StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.proto ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.proto b/relational_operators/TextScanOperator.proto deleted file mode 100644 index 8ead3f3..0000000 --- a/relational_operators/TextScanOperator.proto +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2015 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto2"; - -package quickstep.serialization; - -message TextBlob { - required fixed64 blob_id = 1; - required uint64 size = 2; -} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp index 2130563..1b2979e 100644 --- a/relational_operators/UpdateOperator.cpp +++ b/relational_operators/UpdateOperator.cpp @@ -26,7 +26,9 @@ #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" @@ -72,6 +74,27 @@ bool UpdateOperator::getAllWorkOrders( return started_; } +bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !started_) { + for (const block_id input_block_id : input_blocks_) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::UPDATE); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_); + proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID()); + proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_); + proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_); + proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_); + proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id); + + container->addWorkOrderProto(proto, op_index_); + } + started_ = true; + } + return started_; +} + void UpdateWorkOrder::execute() { MutableBlockReference block( storage_manager_->getBlockMutable(input_block_id_, relation_)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp index cebb9b5..f6c5053 100644 --- a/relational_operators/UpdateOperator.hpp +++ b/relational_operators/UpdateOperator.hpp @@ -45,6 +45,7 @@ class InsertDestination; class Predicate; class Scalar; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -99,6 +100,8 @@ class UpdateOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + QueryContext::insert_destination_id getInsertDestinationID() const override { return relocation_destination_index_; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 60d4c8f..3ed065a 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -232,23 +232,14 @@ message TextScanWorkOrder { } } -message TextSplitWorkOrder { - extend WorkOrder { - // All required. - optional uint64 operator_index = 320; - optional string filename = 321; - optional bool process_escape_sequences = 322; - } -} - message UpdateWorkOrder { extend WorkOrder { // All required. - optional uint64 operator_index = 336; - optional int32 relation_id = 337; - optional int32 insert_destination_index = 338; - optional int32 predicate_index = 339; - optional uint32 update_group_index = 340; - optional fixed64 block_id = 341; + optional uint64 operator_index = 320; + optional int32 relation_id = 321; + optional int32 insert_destination_index = 322; + optional int32 predicate_index = 323; + optional uint32 update_group_index = 324; + optional fixed64 block_id = 325; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index da42b4d..e078b84 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -396,8 +396,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), query_context->getInsertDestination( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), - storage_manager); + proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))); } case serialization::UPDATE: { LOG(INFO) << "Creating UpdateWorkOrder"; @@ -425,6 +424,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, const CatalogDatabaseLite &catalog_database, const QueryContext &query_context) { + if (!proto.IsInitialized()) { + return false; + } + switch (proto.work_order_type()) { case serialization::AGGREGATION: { return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&