jamesge commented on a change in pull request #972: Redis server protocol URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r353119006
########## File path: src/brpc/policy/redis_protocol.cpp ########## @@ -52,62 +61,348 @@ struct InputResponse : public InputMessageBase { } }; -// "Message" = "Response" as we only implement the client for redis. -ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, - bool /*read_eof*/, const void* /*arg*/) { - if (source->empty()) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); +static const char** ParseArgs(const RedisReply& message) { + if (!message.is_array() || message.size() == 0) { + LOG(WARNING) << "request message is not array or size equals to zero"; + return NULL; } - // NOTE(gejun): PopPipelinedInfo() is actually more contended than what - // I thought before. The Socket._pipeline_q is a SPSC queue pushed before - // sending and popped when response comes back, being protected by a - // mutex. Previously the mutex is shared with Socket._id_wait_list. When - // 200 bthreads access one redis-server, ~1.5s in total is spent on - // contention in 10-second duration. If the mutex is separated, the time - // drops to ~0.25s. I further replaced PeekPipelinedInfo() with - // GivebackPipelinedInfo() to lock only once(when receiving response) - // in most cases, and the time decreases to ~0.14s. - PipelinedInfo pi; - if (!socket->PopPipelinedInfo(&pi)) { - LOG(WARNING) << "No corresponding PipelinedInfo in socket"; - return MakeParseError(PARSE_ERROR_TRY_OTHERS); + const char** args = (const char**) + malloc(sizeof(const char*) * (message.size() + 1 /* NULL */)); + for (size_t i = 0; i < message.size(); ++i) { + if (!message[i].is_string()) { + LOG(WARNING) << "request message[" << i << "] is not array"; + free(args); + return NULL; + } + args[i] = message[i].c_str(); } + args[message.size()] = NULL; + return args; +} + +// One redis command corresponding to one ConsumeTaskDone. Whenever user +// has completed the process of handling command and call done->Run() +// (read redis.h for more details), RedisConnContext::Flush() will be +// called and flush the response to client by the order that commands arrive. +class ConsumeTaskDone; + +// This class is as parsing_context in socket. +class RedisConnContext : public SharedObject + , public Destroyable { +public: + RedisConnContext() + : handler_continue(NULL) + , message_count(0) {} + ~RedisConnContext(); + // @Destroyable + void Destroy(); + + int Init(); + // Push `done` to a queue which is read by Flush(). + void Push(ConsumeTaskDone* done); + void Flush(); + void ClearSentDones(); + + SocketId socket_id; + RedisService::CommandMap command_map; + // If user starts a transaction, handler_continue indicates the + // first handler pointer that triggers the transaction. + RedisCommandHandler* handler_continue; + // The redis command are parsed and pushed into this queue + bthread::ExecutionQueueId<ConsumeTaskDone*> queue; + + RedisReply parsing_message; + butil::Arena arena; + int64_t message_count; +private: + void AddSentDone(ConsumeTaskDone* done); - do { - InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context()); - if (msg == NULL) { - msg = new InputResponse; - socket->reset_parsing_context(msg); + bool _writing = false; + butil::Mutex _mutex; + std::queue<ConsumeTaskDone*> _dones; + + butil::Mutex _dones_sent_mutex; + std::queue<ConsumeTaskDone*> _dones_sent; +}; + +class ConsumeTaskDone : public google::protobuf::Closure { +public: + ConsumeTaskDone() + : _ready(false) + , output_message(&arena) + , ctx(NULL) {} + + void Run() override; + bool IsReady() { return _ready.load(butil::memory_order_acquire); } + +private: + butil::atomic<bool> _ready; + +public: + RedisReply input_message; + RedisReply output_message; + RedisConnContext* ctx; + butil::IOBuf sendbuf; + butil::Arena arena; +}; + +int ConsumeTask(RedisConnContext* ctx, ConsumeTaskDone* done) { + ClosureGuard done_guard(done); + done->ctx = ctx; + ctx->Push(done); + RedisReply& output = done->output_message; + + const char** args = ParseArgs(done->input_message); + if (!args) { + output.SetError("ERR command not string"); + return -1; + } + if (ctx->handler_continue) { + RedisCommandHandler::Result result = ctx->handler_continue->Run( + args, &output, done_guard.release()); + if (result == RedisCommandHandler::OK) { + ctx->handler_continue = NULL; } + } else { + std::string comm; + comm.reserve(8); + for (const char* c = done->input_message[0].c_str(); *c; ++c) { + comm.push_back(std::tolower(*c)); + } + auto it = ctx->command_map.find(comm); + if (it == ctx->command_map.end()) { + char buf[64]; + snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str()); + output.SetError(buf); + } else { + RedisCommandHandler::Result result = + it->second->Run(args, &output, done_guard.release()); + if (result == RedisCommandHandler::CONTINUE) { + ctx->handler_continue = it->second.get(); + } + } + } + free(args); + return 0; +} + +int Consume(void* ctx, bthread::TaskIterator<ConsumeTaskDone*>& iter) { + RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx); + if (iter.is_queue_stopped()) { + qctx->RemoveRefManually(); + return 0; + } + for (; iter; ++iter) { + ConsumeTask(qctx, *iter); + } + return 0; +} - const int consume_count = (pi.with_auth ? 1 : pi.count); +// ========== impl of RedisConnContext ========== - ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count); +RedisConnContext::~RedisConnContext() { + ClearSentDones(); + while (!_dones.empty()) { + ConsumeTaskDone* head = _dones.front(); + _dones.pop(); + delete head; + } +} + +void RedisConnContext::Destroy() { + bthread::execution_queue_stop(queue); +} + +int RedisConnContext::Init() { + bthread::ExecutionQueueOptions q_opt; + q_opt.bthread_attr = + FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; + if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) { + LOG(ERROR) << "Fail to start execution queue"; + return -1; + } + return 0; +} + +void RedisConnContext::Push(ConsumeTaskDone* done) { + std::unique_lock<butil::Mutex> m(_mutex); + _dones.push(done); +} +void RedisConnContext::Flush() { + SocketUniquePtr s; + if (Socket::Address(socket_id, &s) != 0) { + LOG(WARNING) << "Fail to address redis socket"; + return; + } + { + std::unique_lock<butil::Mutex> m(_mutex); + if (_writing) return; Review comment: 不符合代码规范 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org