This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 353f2d0a add inline redis protocol support (#3024)
353f2d0a is described below
commit 353f2d0af11b4fb951b25ad4bb3e671d5c5e67ad
Author: Jerry Zhao <[email protected]>
AuthorDate: Fri Jul 25 19:35:33 2025 +0800
add inline redis protocol support (#3024)
* add inline redis protocol support
* complete code
* add check
* fix
* add inline unitest
* use find
* fix
---
src/brpc/redis_command.cpp | 57 ++++++++++++++++++++++++++++++++++++++++++--
test/brpc_redis_unittest.cpp | 23 +++++++++++++++++-
2 files changed, 77 insertions(+), 3 deletions(-)
diff --git a/src/brpc/redis_command.cpp b/src/brpc/redis_command.cpp
index 6eabd59b..5803aaa8 100644
--- a/src/brpc/redis_command.cpp
+++ b/src/brpc/redis_command.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <cctype>
#include <limits>
#include "butil/logging.h"
@@ -376,13 +377,65 @@ size_t RedisCommandParser::ParsedArgsSize() {
ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::vector<butil::StringPiece>* args,
butil::Arena* arena) {
- const char* pfc = (const char*)buf.fetch1();
+ const auto pfc = static_cast<const char *>(buf.fetch1());
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
// '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
if (!_parsing_array && *pfc != '*') {
- return PARSE_ERROR_TRY_OTHERS;
+ if (!std::isalpha(static_cast<unsigned char>(*pfc))) {
+ return PARSE_ERROR_TRY_OTHERS;
+ }
+ const size_t buf_size = buf.size();
+ const auto copy_str = static_cast<char *>(arena->allocate(buf_size +
1));
+ buf.copy_to(copy_str, buf_size);
+ if (*copy_str == ' ') {
+ return PARSE_ERROR_ABSOLUTELY_WRONG;
+ }
+ copy_str[buf_size] = '\0';
+ const size_t crlf_pos = butil::StringPiece(copy_str,
buf_size).find("\r\n");
+ if (crlf_pos == butil::StringPiece::npos) { // not enough data
+ return PARSE_ERROR_NOT_ENOUGH_DATA;
+ }
+ args->clear();
+ size_t offset = 0;
+ while (offset < crlf_pos && copy_str[offset] != ' ') {
+ ++offset;
+ }
+ const auto first_arg = static_cast<char*>(arena->allocate(offset));
+ memcpy(first_arg, copy_str, offset);
+ for (size_t i = 0; i < offset; ++i) {
+ first_arg[i] = tolower(first_arg[i]);
+ }
+ args->push_back(butil::StringPiece(first_arg, offset));
+ if (offset == crlf_pos) {
+ // only one argument, directly return
+ buf.pop_front(crlf_pos + 2);
+ return PARSE_OK;
+ }
+ size_t arg_start_pos = ++offset;
+
+ for (; offset < crlf_pos; ++offset) {
+ if (copy_str[offset] != ' ') {
+ continue;
+ }
+ const auto arg_length = offset - arg_start_pos;
+ const auto arg = static_cast<char *>(arena->allocate(arg_length));
+ memcpy(arg, copy_str + arg_start_pos, arg_length);
+ args->push_back(butil::StringPiece(arg, arg_length));
+ arg_start_pos = ++offset;
+ }
+
+ if (arg_start_pos < crlf_pos) {
+ // process the last argument
+ const auto arg_length = crlf_pos - arg_start_pos;
+ const auto arg = static_cast<char *>(arena->allocate(arg_length));
+ memcpy(arg, copy_str + arg_start_pos, arg_length);
+ args->push_back(butil::StringPiece(arg, arg_length));
+ }
+
+ buf.pop_front(crlf_pos + 2);
+ return PARSE_OK;
}
// '$' stands for bulk string "$<length>\r\n<string>\r\n"
if (_parsing_array && *pfc != '$') {
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index c1e8d059..5e38c374 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -582,7 +582,7 @@ TEST_F(RedisTest, command_parser) {
ASSERT_EQ(command, GetCompleteCommand(command_out));
}
{
- // simulate parsing from network
+ // simulate parsing from network following RESP
int t = 100;
std::string
raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
int size = raw_string.size();
@@ -602,6 +602,27 @@ TEST_F(RedisTest, command_parser) {
ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
}
}
+ {
+ // simulate parsing from network under inline protocol
+ int t = 100;
+ std::string raw_string("set abc def\r\n");
+ int size = raw_string.size();
+ while (t--) {
+ for (int i = 0; i < size; ++i) {
+ buf.push_back(raw_string[i]);
+ if (i == size - 1) {
+ ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf,
&command_out, &arena));
+ } else {
+ if (butil::fast_rand_less_than(2) == 0) {
+ ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
+ parser.Consume(buf, &command_out, &arena));
+ }
+ }
+ }
+ ASSERT_TRUE(buf.empty());
+ ASSERT_EQ(GetCompleteCommand(command_out), "set abc def");
+ }
+ }
{
// there is a non-string message in command and parse should fail
buf.append("*3\r\n$3");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]