jamesge commented on a change in pull request #972: Redis server protocol URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r359138399
########## File path: src/brpc/policy/redis_protocol.cpp ########## @@ -52,62 +54,202 @@ struct InputResponse : public InputMessageBase { } }; -// "Message" = "Response" as we only implement the client for redis. -ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, - bool /*read_eof*/, const void* /*arg*/) { - if (source->empty()) { - return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); +// This class is as parsing_context in socket. +class RedisConnContext : public Destroyable { +public: + RedisConnContext() + : redis_service(NULL) + , batched_size(0) {} + + ~RedisConnContext(); + // @Destroyable + void Destroy() override; + + SocketId socket_id; + RedisService* redis_service; + // If user starts a transaction, transaction_handler indicates the + // handler pointer that runs the transaction command. + std::unique_ptr<RedisCommandHandler> transaction_handler; + // >0 if command handler is run in batched mode. + int batched_size; + + RedisCommandParser parser; +}; + +int ConsumeCommand(RedisConnContext* ctx, + const std::unique_ptr<const char*[]>& commands, + int command_len, butil::Arena* arena, + bool is_last, + butil::IOBuf* sendbuf) { + RedisReply output(arena); + RedisCommandHandler::Result result = RedisCommandHandler::OK; + if (ctx->transaction_handler) { + result = ctx->transaction_handler->Run( + command_len, commands.get(), &output, is_last); + if (result == RedisCommandHandler::OK) { + ctx->transaction_handler.reset(NULL); + } else if (result == RedisCommandHandler::BATCHED) { + LOG(ERROR) << "BATCHED should not be returned by a transaction handler."; + return -1; + } + } else { + RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(commands[0]); + if (!ch) { + char buf[64]; + snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]); + output.SetError(buf); + } else { + result = ch->Run(command_len, commands.get(), &output, is_last); + if (result == RedisCommandHandler::CONTINUE) { + if (ctx->batched_size) { + LOG(ERROR) << "CONTINUE should not be returned in redis batched process."; + return -1; + } + ctx->transaction_handler.reset(ch->NewTransactionHandler()); + } else if (result == RedisCommandHandler::BATCHED) { + ctx->batched_size++; + } + } } - // NOTE(gejun): PopPipelinedInfo() is actually more contended than what - // I thought before. The Socket._pipeline_q is a SPSC queue pushed before - // sending and popped when response comes back, being protected by a - // mutex. Previously the mutex is shared with Socket._id_wait_list. When - // 200 bthreads access one redis-server, ~1.5s in total is spent on - // contention in 10-second duration. If the mutex is separated, the time - // drops to ~0.25s. I further replaced PeekPipelinedInfo() with - // GivebackPipelinedInfo() to lock only once(when receiving response) - // in most cases, and the time decreases to ~0.14s. - PipelinedInfo pi; - if (!socket->PopPipelinedInfo(&pi)) { - LOG(WARNING) << "No corresponding PipelinedInfo in socket"; - return MakeParseError(PARSE_ERROR_TRY_OTHERS); + if (result == RedisCommandHandler::OK) { + if (ctx->batched_size) { + if ((int)output.size() != (ctx->batched_size + 1)) { + LOG(ERROR) << "reply array size can't be matched with batched size, " + << " expected=" << ctx->batched_size + 1 << " actual=" << output.size(); + return -1; + } + for (int i = 0; i < (int)output.size(); ++i) { + output[i].SerializeTo(sendbuf); + } + ctx->batched_size = 0; + } else { + output.SerializeTo(sendbuf); + } + } else if (result == RedisCommandHandler::CONTINUE) { + output.SerializeTo(sendbuf); + } else if (result == RedisCommandHandler::BATCHED) { + // just do nothing and wait handler to return OK. + } else { + LOG(ERROR) << "unknown status=" << result; + return -1; } + return 0; +} - do { - InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context()); - if (msg == NULL) { - msg = new InputResponse; - socket->reset_parsing_context(msg); - } +// ========== impl of RedisConnContext ========== + +RedisConnContext::~RedisConnContext() { } + +void RedisConnContext::Destroy() { + delete this; +} + +// ========== impl of RedisConnContext ========== - const int consume_count = (pi.with_auth ? 1 : pi.count); +ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, + bool read_eof, const void* arg) { + if (read_eof || source->empty()) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + const Server* server = static_cast<const Server*>(arg); + if (server) { + RedisService* rs = server->options().redis_service; + if (!rs) { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context()); + if (ctx == NULL) { + ctx = new RedisConnContext; + ctx->socket_id = socket->id(); + ctx->redis_service = rs; + socket->reset_parsing_context(ctx); + } + butil::Arena arena; + std::unique_ptr<const char*[]> current_commands; + int current_len = 0; + butil::IOBuf sendbuf; + ParseError err = PARSE_OK; - ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count); + err = ctx->parser.Consume(*source, ¤t_commands, ¤t_len, &arena); if (err != PARSE_OK) { - socket->GivebackPipelinedInfo(pi); return MakeParseError(err); } + while (true) { + std::unique_ptr<const char*[]> next_commands; + int next_len = 0; + err = ctx->parser.Consume(*source, &next_commands, &next_len, &arena); + if (err != PARSE_OK) { + break; + } + if (ConsumeCommand(ctx, current_commands, current_len, &arena, + false, &sendbuf) != 0) { + return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); + } + current_commands.swap(next_commands); + current_len = next_len; + } + if (ConsumeCommand(ctx, current_commands, current_len, &arena, + true /* must be last message */, &sendbuf) != 0) { + return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); + } Review comment: 这段写的很简洁,比之前的版本好很多。但其中的len要通过替换vector省去 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org