Refactored command execution in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f25d1c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f25d1c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f25d1c1 Branch: refs/heads/reorder-partitioned-hash-join Commit: 7f25d1c1473def1f6b5733bf55673f91d11d0195 Parents: ccb2852 Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Mar 2 21:30:28 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Mar 2 21:30:28 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Cli.cpp | 54 +++------ cli/distributed/Conductor.cpp | 137 ++++++++-------------- cli/distributed/Conductor.hpp | 5 - query_execution/QueryExecutionMessages.proto | 4 - query_execution/QueryExecutionTypedefs.hpp | 4 +- 5 files changed, 67 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 49b7dc1..63f3259 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -123,7 +123,7 @@ void Cli::init() { data_exchanger_.set_storage_manager(storage_manager_.get()); data_exchanger_.start(); - // Prepare for submitting a query. + // Prepare for submitting a query or a command. bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage); bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); @@ -131,8 +131,6 @@ void Cli::init() { bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage); - // Prepare for submitting a command. - bus_.RegisterClientAsSender(cli_id_, kCommandMessage); bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage); } @@ -166,50 +164,36 @@ void Cli::run() { } if (statement.getStatementType() == ParseStatement::kCommand) { - const ParseCommand &command = static_cast<const ParseCommand &>(statement); - const std::string &command_str = command.command()->value(); + const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement); + const std::string &command = parse_command.command()->value(); try { - if (command_str == C::kAnalyzeCommand) { + if (command == C::kAnalyzeCommand) { // TODO(zuyu): support '\analyze'. - THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command"; - } else if (command_str != C::kDescribeDatabaseCommand && - command_str != C::kDescribeTableCommand) { - THROW_SQL_ERROR_AT(command.command()) << "Invalid Command"; + THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command"; + } else if (command != C::kDescribeDatabaseCommand && + command != C::kDescribeTableCommand) { + THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command"; } } catch (const SqlError &error) { fprintf(stderr, "%s", error.formatMessage(*command_string).c_str()); reset_parser = true; break; } + } - DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage - << "') to Conductor"; - S::CommandMessage proto; - proto.set_command(*command_string); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage); - free(proto_bytes); - - QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message)); - } else { - DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage - << "') to Conductor"; - S::SqlQueryMessage proto; - proto.set_sql_query(*command_string); + DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage + << "') to Conductor"; + S::SqlQueryMessage proto; + proto.set_sql_query(*command_string); - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage); - free(proto_bytes); + TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage); + free(proto_bytes); - QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message)); - } + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message)); start = std::chrono::steady_clock::now(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index b877b04..1b8bfb2 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -95,21 +95,18 @@ void Conductor::init() { bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage); bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage); - bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage); - bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage); - bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage); + bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage); bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage); bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage); + bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage); block_locator_ = make_unique<BlockLocator>(&bus_); block_locator_->start(); - foreman_ = make_unique<ForemanDistributed>(*block_locator_, - std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_, - catalog_database_); + foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, catalog_database_, query_processor_.get()); foreman_->start(); } @@ -132,14 +129,6 @@ void Conductor::run() { QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); break; } - case kCommandMessage: { - S::CommandMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - DLOG(INFO) << "Conductor received the following command: " << proto.command(); - - processCommandMessage(sender, new string(move(proto.command()))); - break; - } case kSqlQueryMessage: { S::SqlQueryMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -161,91 +150,59 @@ void Conductor::run() { } } -void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) { - parser_wrapper_.feedNextBuffer(command_string); - ParseResult parse_result = parser_wrapper_.getNextStatement(); - +void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) { + SqlParserWrapper parser_wrapper; + parser_wrapper.feedNextBuffer(command_string); + ParseResult parse_result = parser_wrapper.getNextStatement(); CHECK(parse_result.condition == ParseResult::kSuccess) << "Any syntax error should be addressed in the DistributedCli."; const ParseStatement &statement = *parse_result.parsed_statement; - DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType()); - - const ParseCommand &command = static_cast<const ParseCommand &>(statement); - const PtrVector<ParseString> &arguments = *(command.arguments()); - const string &command_str = command.command()->value(); - - string command_response; try { - if (command_str == C::kDescribeDatabaseCommand) { - command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); - } else if (command_str == C::kDescribeTableCommand) { - if (arguments.empty()) { + if (statement.getStatementType() == ParseStatement::kCommand) { + const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement); + const PtrVector<ParseString> &arguments = *(parse_command.arguments()); + const string &command = parse_command.command()->value(); + + string command_response; + if (command == C::kDescribeDatabaseCommand) { command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); - } else { - command_response = C::ExecuteDescribeTable(arguments, *catalog_database_); + } else if (command == C::kDescribeTableCommand) { + if (arguments.empty()) { + command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); + } else { + command_response = C::ExecuteDescribeTable(arguments, *catalog_database_); + } } - } - } catch (const SqlError &command_error) { - // Set the query execution status along with the error message. - S::QueryExecutionErrorMessage proto; - proto.set_error_message(command_error.formatMessage(*command_string)); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryExecutionErrorMessage); - free(proto_bytes); - - DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '" - << kQueryExecutionErrorMessage - << "') to Distributed CLI " << sender; - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); - } - - S::CommandResponseMessage proto; - proto.set_command_response(command_response); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); - free(proto_bytes); - - DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage - << "') to Distributed CLI " << sender; - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); -} - -void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) { - parser_wrapper_.feedNextBuffer(command_string); - ParseResult parse_result = parser_wrapper_.getNextStatement(); - - CHECK(parse_result.condition == ParseResult::kSuccess) - << "Any SQL syntax error should be addressed in the DistributedCli."; - - const ParseStatement &statement = *parse_result.parsed_statement; - DCHECK_NE(ParseStatement::kCommand, statement.getStatementType()); - - try { - auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), - sender, - statement.getPriority()); - query_processor_->generateQueryHandle(statement, query_handle.get()); - DCHECK(query_handle->getQueryPlanMutable() != nullptr); - - QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( - conductor_client_id_, - foreman_->getBusClientID(), - query_handle.release(), - &bus_); + S::CommandResponseMessage proto; + proto.set_command_response(command_response); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); + free(proto_bytes); + + DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage + << "') to Distributed CLI " << sender; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); + } else { + auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), + sender, + statement.getPriority()); + query_processor_->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + conductor_client_id_, + foreman_->getBusClientID(), + query_handle.release(), + &bus_); + } } catch (const SqlError &sql_error) { // Set the query execution status along with the error message. S::QueryExecutionErrorMessage proto; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/cli/distributed/Conductor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp index e7e003f..0c0f7e5 100644 --- a/cli/distributed/Conductor.hpp +++ b/cli/distributed/Conductor.hpp @@ -24,7 +24,6 @@ #include <string> #include "cli/distributed/Role.hpp" -#include "parser/SqlParserWrapper.hpp" #include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" #include "query_optimizer/QueryProcessor.hpp" @@ -61,12 +60,8 @@ class Conductor final : public Role { void run() override; private: - void processCommandMessage(const tmb::client_id sender, std::string *command_string); - void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string); - SqlParserWrapper parser_wrapper_; - std::unique_ptr<QueryProcessor> query_processor_; // Not owned. CatalogDatabase *catalog_database_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 47246d8..a45e8df 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -81,10 +81,6 @@ message ShiftbossRegistrationResponseMessage { required uint64 shiftboss_index = 1; } -message CommandMessage { - required string command = 1; -} - message SqlQueryMessage { required string sql_query = 1; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f25d1c1/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 0fd0bdf..a49de5e 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -90,9 +90,7 @@ enum QueryExecutionMessageType : message_type_id { kDistributedCliRegistrationMessage, // From CLI to Conductor. kDistributedCliRegistrationResponseMessage, // From Conductor to CLI. - // From CLI to Conductor. - kCommandMessage, - kSqlQueryMessage, + kSqlQueryMessage, // From CLI to Conductor. kQueryInitiateMessage, // From Foreman to Shiftboss. kQueryInitiateResponseMessage, // From Shiftboss to Foreman.