Renamed a QueryExecutionMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aaecc76b Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aaecc76b Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aaecc76b Branch: refs/heads/LIP-for-tpch Commit: aaecc76b7bea85f46bf06dc2e63fccf43636d7eb Parents: 33554c3 Author: Zuyu Zhang <zu...@twitter.com> Authored: Thu Jul 28 11:11:33 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Thu Jul 28 11:11:33 2016 -0700 ---------------------------------------------------------------------- query_execution/QueryExecutionMessages.proto | 4 +- query_execution/QueryExecutionTypedefs.hpp | 4 +- query_execution/Shiftboss.cpp | 45 +++++++++++------------ query_execution/Shiftboss.hpp | 4 +- 4 files changed, 28 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 591ca6c..308d736 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -113,12 +113,12 @@ message InitiateRebuildResponseMessage { required uint64 num_rebuild_work_orders = 3; } -message QueryResultRelationMessage { +message SaveQueryResultMessage { required int32 relation_id = 1; repeated fixed64 blocks = 2 [packed=true]; } -message QueryResultRelationResponseMessage { +message SaveQueryResultResponseMessage { required int32 relation_id = 1; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index d73d4ee..b67209f 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -81,8 +81,8 @@ enum QueryExecutionMessageType : message_type_id { kInitiateRebuildMessage, // From Foreman to Shiftboss. kInitiateRebuildResponseMessage, // From Shiftboss to Foreman. - kQueryResultRelationMessage, // From Foreman to Shiftboss. - kQueryResultRelationResponseMessage, // From Shiftboss to Foreman. + kSaveQueryResultMessage, // From Foreman to Shiftboss. + kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index af56306..7f655c6 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -149,11 +149,10 @@ void Shiftboss::run() { move(annotated_message.tagged_message)); break; } - case kQueryResultRelationMessage: { - // TODO(zuyu): Rename to kSaveQueryResultMessage. + case kSaveQueryResultMessage: { const TaggedMessage &tagged_message = annotated_message.tagged_message; - serialization::QueryResultRelationMessage proto; + serialization::SaveQueryResultMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); for (int i = 0; i < proto.blocks_size(); ++i) { @@ -168,25 +167,25 @@ void Shiftboss::run() { } } - serialization::QueryResultRelationResponseMessage ack_proto; - ack_proto.set_relation_id(proto.relation_id()); + serialization::SaveQueryResultResponseMessage proto_response; + proto_response.set_relation_id(proto.relation_id()); - const size_t ack_proto_length = ack_proto.ByteSize(); - char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length)); - CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length)); + const size_t proto_response_length = proto_response.ByteSize(); + char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); + CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); - TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes), - ack_proto_length, - kQueryResultRelationResponseMessage); - free(ack_proto_bytes); + TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), + proto_response_length, + kSaveQueryResultResponseMessage); + free(proto_response_bytes); LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage - << ") to Foreman"; + << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage + << "') to Foreman"; QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, - move(ack_message)); + move(message_response)); break; } case kPoisonMessage: { @@ -280,15 +279,15 @@ void Shiftboss::processQueryInitiateMessage( char *proto_bytes = static_cast<char*>(malloc(proto_length)); CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage ack_message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryInitiateResponseMessage); + TaggedMessage message_response(static_cast<const void*>(proto_bytes), + proto_length, + kQueryInitiateResponseMessage); free(proto_bytes); QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, - move(ack_message)); + move(message_response)); } void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, @@ -317,15 +316,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, char *proto_bytes = static_cast<char*>(malloc(proto_length)); CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage ack_message(static_cast<const void*>(proto_bytes), - proto_length, - kInitiateRebuildResponseMessage); + TaggedMessage message_response(static_cast<const void*>(proto_bytes), + proto_length, + kInitiateRebuildResponseMessage); free(proto_bytes); QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, - move(ack_message)); + move(message_response)); for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) { // NOTE(zuyu): Worker releases the memory after the execution of http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aaecc76b/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 096ab74..9e24d62 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -112,8 +112,8 @@ class Shiftboss : public Thread { bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage); + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage); // Stop itself. bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);