jamesge commented on a change in pull request #972: Redis server protocol URL: https://github.com/apache/incubator-brpc/pull/972#discussion_r350649862
########## File path: test/brpc_redis_unittest.cpp ########## @@ -547,4 +549,442 @@ TEST_F(RedisTest, quote_and_escape) { request.Clear(); } +TEST_F(RedisTest, codec) { + butil::Arena arena; + // status + { + brpc::RedisMessage r(&arena); + butil::IOBuf buf; + ASSERT_TRUE(r.SetStatus("OK")); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n"); + ASSERT_STREQ(r.c_str(), "OK"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_string()); + ASSERT_STREQ("OK", r.c_str()); + } + // error + { + brpc::RedisMessage r(&arena); + butil::IOBuf buf; + ASSERT_TRUE(r.SetError("not exist \'key\'")); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_error()); + ASSERT_STREQ("not exist \'key\'", r.error_message()); + } + // string + { + brpc::RedisMessage r(&arena); + butil::IOBuf buf; + ASSERT_TRUE(r.SetNilString()); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n"); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_nil()); + + r.Clear(); + ASSERT_TRUE(r.SetBulkString("abcde'hello world")); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n"); + ASSERT_STREQ(r.c_str(), "abcde'hello world"); + r.Clear(); + err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_string()); + ASSERT_STREQ(r.c_str(), "abcde'hello world"); + } + // integer + { + brpc::RedisMessage r(&arena); + butil::IOBuf buf; + int t = 2; + int input[] = { -1, 1234567 }; + const char* output[] = { ":-1\r\n", ":1234567\r\n" }; + for (int i = 0; i < t; ++i) { + r.Clear(); + ASSERT_TRUE(r.SetInteger(input[i])); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), output[i]); + r.Clear(); + brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); + ASSERT_EQ(err, brpc::PARSE_OK); + ASSERT_TRUE(r.is_integer()); + ASSERT_EQ(r.integer(), input[i]); + } + } + // array + { + brpc::RedisMessage r(&arena); + butil::IOBuf buf; + ASSERT_TRUE(r.SetArray(3)); + brpc::RedisMessage& sub_reply = r[0]; + sub_reply.SetArray(2); + sub_reply[0].SetBulkString("hello, it's me"); + sub_reply[1].SetInteger(422); + r[1].SetBulkString("To go over everything"); + r[2].SetInteger(1); + ASSERT_TRUE(r[3].is_nil()); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), + "*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n" + "To go over everything\r\n:1\r\n"); + r.Clear(); + ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); + ASSERT_TRUE(r.is_array()); + ASSERT_EQ(3ul, r.size()); + ASSERT_TRUE(r[0].is_array()); + ASSERT_EQ(2ul, r[0].size()); + ASSERT_TRUE(r[0][0].is_string()); + ASSERT_STREQ(r[0][0].c_str(), "hello, it's me"); + ASSERT_TRUE(r[0][1].is_integer()); + ASSERT_EQ(r[0][1].integer(), 422); + ASSERT_TRUE(r[1].is_string()); + ASSERT_STREQ(r[1].c_str(), "To go over everything"); + ASSERT_TRUE(r[2].is_integer()); + ASSERT_EQ(1, r[2].integer()); + + r.Clear(); + // nil array + ASSERT_TRUE(r.SetArray(-1)); + ASSERT_TRUE(r.SerializeToIOBuf(&buf)); + ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n"); + ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); + ASSERT_TRUE(r.is_nil()); + } +} + +butil::Mutex s_mutex; +std::unordered_map<std::string, std::string> m; +std::unordered_map<std::string, int64_t> int_map; + +struct SleepArgs { + int sleep_ms; + google::protobuf::Closure* done; +}; + +void* sleep(void *arg) { + SleepArgs* args = static_cast<SleepArgs*>(arg); + bthread_usleep(args->sleep_ms * 1000); + args->done->Run(); + delete args; + return NULL; +} + +class SetCommandHandler : public brpc::RedisCommandHandler { +public: + SetCommandHandler(bool sleep = false) + : _sleep(sleep) {} + + brpc::RedisCommandHandler::Result Run(const char* args[], + brpc::RedisMessage* output, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + std::string key = args[1]; + std::string value = args[2]; + m[key] = value; + output->SetStatus("OK"); + if (_sleep) { + SleepArgs *args = new SleepArgs; + args->sleep_ms = _sleep_ms; + args->done = done_guard.release(); + bthread_t bth; + EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args)); + if (_sleep_ms > 20) _sleep_ms -= 20; + } + return brpc::RedisCommandHandler::OK; + } + RedisCommandHandler* New() { _new_count++; return new SetCommandHandler(_sleep); } + int new_count() { return _new_count; } + +private: + int _sleep_ms = 100; + int _new_count = 0; + bool _sleep = false; +}; + +class GetCommandHandler : public brpc::RedisCommandHandler { +public: + GetCommandHandler(bool sleep = false) + : _sleep(sleep) {} + + brpc::RedisCommandHandler::Result Run(const char* args[], + brpc::RedisMessage* output, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + std::string key = args[1]; + auto it = m.find(key); + if (it != m.end()) { + output->SetBulkString(it->second); + } else { + output->SetNilString(); + } + if (_sleep) { + SleepArgs *args = new SleepArgs; + args->sleep_ms = _sleep_ms; + args->done = done_guard.release(); + bthread_t bth; + EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args)); + if (_sleep_ms > 20) _sleep_ms -= 20; + } + return brpc::RedisCommandHandler::OK; + } + RedisCommandHandler* New() { _new_count++; return new GetCommandHandler(_sleep); } + int new_count() { return _new_count; } + +private: + int _sleep_ms = 100; + int _new_count = 0; + bool _sleep = false; +}; + +class IncrCommandHandler : public brpc::RedisCommandHandler { +public: + IncrCommandHandler(bool sleep = false) + : _sleep(sleep) {} + + brpc::RedisCommandHandler::Result Run(const char* args[], + brpc::RedisMessage* output, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + int64_t value; + s_mutex.lock(); + value = ++int_map[args[1]]; + s_mutex.unlock(); + output->SetInteger(value); + if (_sleep) { + SleepArgs *args = new SleepArgs; + args->sleep_ms = _sleep_ms; + args->done = done_guard.release(); + bthread_t bth; + EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args)); + if (_sleep_ms > 20) _sleep_ms -= 20; + } + return brpc::RedisCommandHandler::OK; + } + RedisCommandHandler* New() { _new_count++; return new IncrCommandHandler(_sleep); } + int new_count() { return _new_count; } + +private: + int _sleep_ms = 100; + int _new_count = 0; + bool _sleep = false; +}; + +class RedisServiceImpl : public brpc::RedisService { }; + +TEST_F(RedisTest, server_sanity) { + brpc::Server server; + brpc::ServerOptions server_options; + RedisServiceImpl* rsimpl = new RedisServiceImpl; + GetCommandHandler *gh = new GetCommandHandler(true); + SetCommandHandler *sh = new SetCommandHandler(true); + IncrCommandHandler *ih = new IncrCommandHandler(true); + rsimpl->AddCommandHandler("get", gh); + rsimpl->AddCommandHandler("set", sh); + rsimpl->AddCommandHandler("incr", ih); + server_options.redis_service = rsimpl; + brpc::PortRange pr(8081, 8900); + ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); + + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_REDIS; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); + + brpc::RedisRequest request; + brpc::RedisResponse response; + brpc::Controller cntl; + ASSERT_TRUE(request.AddCommand("get hello")); + ASSERT_TRUE(request.AddCommand("get hello2")); + ASSERT_TRUE(request.AddCommand("set key1 value1")); + ASSERT_TRUE(request.AddCommand("get key1")); + ASSERT_TRUE(request.AddCommand("set key2 value2")); + ASSERT_TRUE(request.AddCommand("get key2")); + ASSERT_TRUE(request.AddCommand("xxxcommand key2")); + channel.CallMethod(NULL, &cntl, &request, &response, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ(7, response.reply_size()); + ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(0).type()); + ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(1).type()); + ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(2).type()); + ASSERT_STREQ("OK", response.reply(2).c_str()); + ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(3).type()); + ASSERT_STREQ("value1", response.reply(3).c_str()); + ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(4).type()); + ASSERT_STREQ("OK", response.reply(4).c_str()); + ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(5).type()); + ASSERT_STREQ("value2", response.reply(5).c_str()); + ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(6).type()); + ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command")); + + ASSERT_EQ(gh->new_count(), 1); + ASSERT_EQ(sh->new_count(), 1); + ASSERT_EQ(ih->new_count(), 1); +} + +void* incr_thread(void* arg) { + brpc::Channel* c = static_cast<brpc::Channel*>(arg); + + for (int i = 0; i < 5000; ++i) { + brpc::RedisRequest request; + brpc::RedisResponse response; + brpc::Controller cntl; + EXPECT_TRUE(request.AddCommand("incr count")); + c->CallMethod(NULL, &cntl, &request, &response, NULL); + EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText(); + EXPECT_EQ(1, response.reply_size()); + EXPECT_TRUE(response.reply(0).is_integer()); + } + return NULL; +} + +TEST_F(RedisTest, server_concurrency) { + int N = 10; + brpc::Server server; + brpc::ServerOptions server_options; + RedisServiceImpl* rsimpl = new RedisServiceImpl; + IncrCommandHandler *ih = new IncrCommandHandler; + rsimpl->AddCommandHandler("incr", ih); + server_options.redis_service = rsimpl; + brpc::PortRange pr(8081, 8900); + ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options)); + + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_REDIS; + options.connection_type = "pooled"; + std::vector<bthread_t> bths; + std::vector<brpc::Channel*> channels; + for (int i = 0; i < N; ++i) { + channels.push_back(new brpc::Channel); + ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options)); + bthread_t bth; + ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0); + bths.push_back(bth); + } + + for (int i = 0; i < N; ++i) { + bthread_join(bths[i], NULL); + delete channels[i]; + } + ASSERT_EQ(int_map["count"], 10 * 5000LL); + ASSERT_EQ(ih->new_count(), N); +} + +class MultiCommandHandler : public brpc::RedisCommandHandler { +public: + RedisCommandHandler::Result Run(const char* args[], + brpc::RedisMessage* output, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + if (strcmp(args[0], "multi") == 0) { Review comment: strcasecmp? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org