jamesge commented on a change in pull request #972: Redis server protocol URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r359142218
########## File path: test/brpc_redis_unittest.cpp ########## @@ -547,4 +550,636 @@ TEST_F(RedisTest, quote_and_escape) { request.Clear(); } +std::string GetCompleteCommand(const std::unique_ptr<const char*[]>& commands) { + std::string res; + for (int i = 0; commands[i]; ++i) { + if (i != 0) { + res.push_back(' '); + } + res.append(commands[i]); + } + return res; +} + + +TEST_F(RedisTest, command_parser) { + brpc::RedisCommandParser parser; + butil::IOBuf buf; + std::unique_ptr<const char*[]> command_out; + butil::Arena arena; + int len = 0; + { + // parse from whole command + std::string command = "set abc edc"; + ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok()); + ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena)); + ASSERT_TRUE(buf.empty()); + ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str()); + } + { + // simulate parsing from network + int t = 100; + std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n"); + int size = raw_string.size(); + while (t--) { + for (int i = 0; i < size; ++i) { + buf.push_back(raw_string[i]); + if (i == size - 1) { + ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena)); + } else { + if (butil::fast_rand_less_than(2) == 0) { + ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, + parser.Consume(buf, &command_out, &len, &arena)); + } + } + } + ASSERT_TRUE(buf.empty()); + ASSERT_STREQ(GetCompleteCommand(command_out).c_str(), "set abc def"); + } + } + { + // there is a non-string message in command and parse should fail + buf.append("*3\r\n$3"); + ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &len, &arena)); + ASSERT_EQ((int)buf.size(), 2); // left "$3" + buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n"); + ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &len, &arena)); + parser.Reset(); + } + { + // not array + buf.append(":123456\r\n"); + ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); + parser.Reset(); + } + { + // not array + buf.append("+Error\r\n"); + ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); + parser.Reset(); + } + { + // not array + buf.append("+OK\r\n"); + ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); + parser.Reset(); + } + { + // not array + buf.append("$5\r\nhello\r\n"); + ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); + parser.Reset(); + } +} + +TEST_F(RedisTest, redis_reply_codec) { + butil::Arena arena; + // status + { + brpc::RedisReply r(&arena); + butil::IOBuf buf; + r.SetStatus("OK"); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n"); + ASSERT_STREQ(r.c_str(), "OK"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_string()); + ASSERT_STREQ("OK", r.c_str()); + } + // error + { + brpc::RedisReply r(&arena); + butil::IOBuf buf; + r.SetError("not exist \'key\'"); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_error()); + ASSERT_STREQ("not exist \'key\'", r.error_message()); + } + // string + { + brpc::RedisReply r(&arena); + butil::IOBuf buf; + r.SetNullString(); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_nil()); + + r.Clear(); + r.SetString("abcde'hello world"); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n"); + ASSERT_STREQ(r.c_str(), "abcde'hello world"); + r.Clear(); + err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_string()); + ASSERT_STREQ(r.c_str(), "abcde'hello world"); + } + // integer + { + brpc::RedisReply r(&arena); + butil::IOBuf buf; + int t = 2; + int input[] = { -1, 1234567 }; + const char* output[] = { ":-1\r\n", ":1234567\r\n" }; + for (int i = 0; i < t; ++i) { + r.Clear(); + r.SetInteger(input[i]); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), output[i]); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_integer()); + ASSERT_EQ(r.integer(), input[i]); + } + } + // array + { + brpc::RedisReply r(&arena); + butil::IOBuf buf; + r.SetArray(3); + brpc::RedisReply& sub_reply = r[0]; + sub_reply.SetArray(2); + sub_reply[0].SetString("hello, it's me"); + sub_reply[1].SetInteger(422); + r[1].SetString("To go over everything"); + r[2].SetInteger(1); + ASSERT_TRUE(r[3].is_nil()); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), + "*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n" + "To go over everything\r\n:1\r\n"); + r.Clear(); + ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); + ASSERT_TRUE(r.is_array()); + ASSERT_EQ(3ul, r.size()); + ASSERT_TRUE(r[0].is_array()); + ASSERT_EQ(2ul, r[0].size()); + ASSERT_TRUE(r[0][0].is_string()); + ASSERT_STREQ(r[0][0].c_str(), "hello, it's me"); + ASSERT_TRUE(r[0][1].is_integer()); + ASSERT_EQ(r[0][1].integer(), 422); + ASSERT_TRUE(r[1].is_string()); + ASSERT_STREQ(r[1].c_str(), "To go over everything"); + ASSERT_TRUE(r[2].is_integer()); + ASSERT_EQ(1, r[2].integer()); + + r.Clear(); + // null array + r.SetNullArray(); + ASSERT_TRUE(r.SerializeTo(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n"); + ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); + ASSERT_TRUE(r.is_nil()); + } + + // CopyFromDifferentArena + { + brpc::RedisReply r(&arena); + r.SetArray(1); + brpc::RedisReply& sub_reply = r[0]; + sub_reply.SetArray(2); + sub_reply[0].SetString("hello, it's me"); + sub_reply[1].SetInteger(422); + + brpc::RedisReply r2(NULL); + r2.CopyFromDifferentArena(r, &arena); + ASSERT_TRUE(r2.is_array()); + ASSERT_EQ((int)r2[0].size(), 2); + ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str()); + ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer()); + } + // SetXXX can be called multiple times. + { + brpc::RedisReply r(&arena); + r.SetStatus("OK"); + ASSERT_TRUE(r.is_string()); + r.SetNullString(); + ASSERT_TRUE(r.is_nil()); + r.SetArray(2); + ASSERT_TRUE(r.is_array()); + r.SetString("OK"); + ASSERT_TRUE(r.is_string()); + r.SetError("OK"); + ASSERT_TRUE(r.is_error()); + r.SetInteger(42); + ASSERT_TRUE(r.is_integer()); + } +} + +butil::Mutex s_mutex; +std::unordered_map<std::string, std::string> m; +std::unordered_map<std::string, int64_t> int_map; + +class RedisServiceImpl : public brpc::RedisService { +public: + RedisServiceImpl() + : _batch_count(0) {} + + brpc::RedisCommandHandler::Result OnBatched(int size, const char* args[], + brpc::RedisReply* output, bool is_last) { + if (_batched_command.empty() && is_last) { + if (strcasecmp(args[0], "set") == 0) { + DoSet(args[1], args[2], output); + } else if (strcasecmp(args[0], "get") == 0) { + DoGet(args[1], output); + } + return brpc::RedisCommandHandler::OK; + } + std::vector<std::string> comm; + for (int i = 0; args[i]; ++i) { + comm.push_back(args[i]); + } + _batched_command.push_back(comm); + if (is_last) { + output->SetArray(_batched_command.size()); + for (int i = 0; i < (int)_batched_command.size(); ++i) { + if (_batched_command[i][0] == "set") { + DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]); + } else if (_batched_command[i][0] == "get") { + DoGet(_batched_command[i][1], &(*output)[i]); + } + } + _batch_count++; + _batched_command.clear(); + return brpc::RedisCommandHandler::OK; + } else { + return brpc::RedisCommandHandler::BATCHED; + } + } + + void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) { + m[key] = value; + output->SetStatus("OK"); + } + + void DoGet(const std::string& key, brpc::RedisReply* output) { + auto it = m.find(key); + if (it != m.end()) { + output->SetString(it->second); + } else { + output->SetNullString(); + } + } + + std::vector<std::vector<std::string> > _batched_command; + int _batch_count; +}; + + +class SetCommandHandler : public brpc::RedisCommandHandler { +public: + SetCommandHandler(bool batch_process = false) + : rs(NULL) + , _batch_process(batch_process) {} + + brpc::RedisCommandHandler::Result Run(int size, const char* args[], + brpc::RedisReply* output, + bool is_last) { + if (size < 3) { + output->SetError("ERR wrong number of arguments for 'set' command"); + return brpc::RedisCommandHandler::OK; + } + if (_batch_process) { + return rs->OnBatched(size, args, output, is_last); + } else { + DoSet(args[1], args[2], output); + return brpc::RedisCommandHandler::OK; + } + } + + void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) { + m[key] = value; + output->SetStatus("OK"); + } + + RedisServiceImpl* rs; +private: + bool _batch_process; +}; + +class GetCommandHandler : public brpc::RedisCommandHandler { +public: + GetCommandHandler(bool batch_process = false) + : rs(NULL) + , _batch_process(batch_process) {} + + brpc::RedisCommandHandler::Result Run(int size, const char* args[], + brpc::RedisReply* output, + bool is_last) { + if (size < 2) { + output->SetError("ERR wrong number of arguments for 'get' command"); + return brpc::RedisCommandHandler::OK; + } + if (_batch_process) { + return rs->OnBatched(size, args, output, is_last); + } else { + DoGet(args[1], output); + return brpc::RedisCommandHandler::OK; + } + } + + void DoGet(const std::string& key, brpc::RedisReply* output) { + auto it = m.find(key); + if (it != m.end()) { + output->SetString(it->second); + } else { + output->SetNullString(); + } + } + + RedisServiceImpl* rs; +private: + bool _batch_process; +}; + +class IncrCommandHandler : public brpc::RedisCommandHandler { +public: + IncrCommandHandler() {} + + brpc::RedisCommandHandler::Result Run(int size, const char* args[], + brpc::RedisReply* output, + bool is_last) { + if (size < 2) { + output->SetError("ERR wrong number of arguments for 'incr' command"); + return brpc::RedisCommandHandler::OK; + } + const std::string& key = args[1]; + int64_t value; + s_mutex.lock(); + value = ++int_map[key]; + s_mutex.unlock(); + output->SetInteger(value); + return brpc::RedisCommandHandler::OK; + } +}; + +TEST_F(RedisTest, server_sanity) { + brpc::Server server; + brpc::ServerOptions server_options; + RedisServiceImpl* rsimpl = new RedisServiceImpl; + GetCommandHandler *gh = new GetCommandHandler; + SetCommandHandler *sh = new SetCommandHandler; + IncrCommandHandler *ih = new IncrCommandHandler; + rsimpl->AddCommandHandler("get", gh); + rsimpl->AddCommandHandler("set", sh); + rsimpl->AddCommandHandler("incr", ih); + server_options.redis_service = rsimpl; + brpc::PortRange pr(8081, 8900); + ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); + + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_REDIS; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); + + brpc::RedisRequest request; + brpc::RedisResponse response; + brpc::Controller cntl; + ASSERT_TRUE(request.AddCommand("get hello")); + ASSERT_TRUE(request.AddCommand("get hello2")); + ASSERT_TRUE(request.AddCommand("set key1 value1")); + ASSERT_TRUE(request.AddCommand("get key1")); + ASSERT_TRUE(request.AddCommand("set key2 value2")); + ASSERT_TRUE(request.AddCommand("get key2")); + ASSERT_TRUE(request.AddCommand("xxxcommand key2")); + channel.CallMethod(NULL, &cntl, &request, &response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(7, response.reply_size()); + ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type()); + ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type()); + ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type()); + ASSERT_STREQ("OK", response.reply(2).c_str()); + ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type()); + ASSERT_STREQ("value1", response.reply(3).c_str()); + ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type()); + ASSERT_STREQ("OK", response.reply(4).c_str()); + ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type()); + ASSERT_STREQ("value2", response.reply(5).c_str()); + ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type()); + ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command")); +} + +void* incr_thread(void* arg) { + brpc::Channel* c = static_cast<brpc::Channel*>(arg); + + for (int i = 0; i < 5000; ++i) { + brpc::RedisRequest request; + brpc::RedisResponse response; + brpc::Controller cntl; + EXPECT_TRUE(request.AddCommand("incr count")); + c->CallMethod(NULL, &cntl, &request, &response, NULL); + EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText(); + EXPECT_EQ(1, response.reply_size()); + EXPECT_TRUE(response.reply(0).is_integer()); + } + return NULL; +} + +TEST_F(RedisTest, server_concurrency) { + int N = 10; + brpc::Server server; + brpc::ServerOptions server_options; + RedisServiceImpl* rsimpl = new RedisServiceImpl; + IncrCommandHandler *ih = new IncrCommandHandler; + rsimpl->AddCommandHandler("incr", ih); + server_options.redis_service = rsimpl; + brpc::PortRange pr(8081, 8900); + ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options)); + + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_REDIS; + options.connection_type = "pooled"; + std::vector<bthread_t> bths; + std::vector<brpc::Channel*> channels; + for (int i = 0; i < N; ++i) { + channels.push_back(new brpc::Channel); + ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options)); + bthread_t bth; + ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0); + bths.push_back(bth); + } + + for (int i = 0; i < N; ++i) { + bthread_join(bths[i], NULL); + delete channels[i]; + } + ASSERT_EQ(int_map["count"], 10 * 5000LL); +} + +class MultiCommandHandler : public brpc::RedisCommandHandler { +public: + MultiCommandHandler() {} + + brpc::RedisCommandHandler::Result Run(int size, const char* args[], + brpc::RedisReply* output, + bool is_last) { + output->SetStatus("OK"); + return brpc::RedisCommandHandler::CONTINUE; + } + + RedisCommandHandler* NewTransactionHandler() { + return new MultiTransactionHandler; + } + + class MultiTransactionHandler : public brpc::RedisCommandHandler { + public: + brpc::RedisCommandHandler::Result Run(int size, const char* args[], + brpc::RedisReply* output, + bool is_last) { + if (strcasecmp(args[0], "multi") == 0) { Review comment: 这些args反正都需要框架读出来,这里其实我觉得可以在框架层面做一些保证,比如一定是小写之类的,简化用户的工作,或者减少用户犯错的概率。 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org